Source code for draugr.tensorboard_utilities.exporting.event_export

import enum
from enum import Enum
from itertools import zip_longest
from pathlib import Path
from pickle import dump
from typing import Iterable, Mapping, Tuple, TypeVar, Union

import numpy
import pandas

from PIL.Image import Image
from matplotlib import pyplot


from apppath import AppPath, ensure_existence

__all__ = ["TensorboardEventExporter"]

from warg import passes_kws_to

# TODO: implement export options using ExportMethodEnum
# TODO: MAJOR REFACTOR INCOMING

TagTypeEnum = TypeVar("TagTypeEnum")


[docs]class TensorboardEventExporter: """ Reads event files and exports the requested tags.""" # class TagTypeEnum(Enum): #Static version, does not adapt to plugins! # images = 'images' # scalars = 'scalars' # tensors = 'tensors' # audio = 'audio' # distributions= 'distributions' # graph = 'graph' # meta_graph = 'meta_graph' # histograms = 'histograms' # run_metadata = 'run_metadata'
[docs] def __init__( self, path_to_events_file_s: Path, size_guidance: Mapping = None, *, save_to_disk: bool = False, ): """ :param path_to_events_file_s: :param size_guidance: :param save_to_disk:""" if size_guidance is None: size_guidance = 0 from tensorboard.backend.event_processing import event_accumulator if isinstance(size_guidance, Mapping): pass elif isinstance( size_guidance, int ): # if only an integer was provided override all size_guidance entries to that integer size_guidance_map = ( event_accumulator.STORE_EVERYTHING_SIZE_GUIDANCE ) # Get entries with store everything if ( size_guidance > 0 ): # the provided integer was above 0 (store_everything) then limit store to that integer (limited) size_guidance_map = {k: size_guidance for k in size_guidance_map.keys()} size_guidance = size_guidance_map else: raise TypeError(f"Invalid type of size guidance {type(size_guidance)}") self.path_to_events_file = str(path_to_events_file_s) self.event_acc = event_accumulator.EventAccumulator( self.path_to_events_file, size_guidance=size_guidance ) self.event_acc.Reload() self.tags_available = self.event_acc.Tags() self.save_to_disk = save_to_disk tags_dict = {} for ( t ) in self.tags_available: # TODO: Automatic but not nice for code completion setattr(self, f"available_{t}", self.tags_available[t]) tags_dict[str(t)] = str(t) TensorboardEventExporter.TagTypeEnum = enum.Enum( "TagTypeEnum", tags_dict ) # dynamic version
[docs] def tag_test(self, *tags, type_str: Union[str, TagTypeEnum]) -> bool: """ :param tags: :param type_str: :return:""" if not len(tags): print("No tags requested") # raise Exception #TODO: maybe if isinstance(type_str, Enum): type_str = type_str.value if ( len(tags) == 1 and isinstance(tags[0], Iterable) and not isinstance(tags[0], str) ): tags = tags[0] tags_available = self.tags_available[type_str] assert all( [tags_available.__contains__(t) for t in tags] ), f"{type_str} tags available: {tags_available}, tags requested {tags}" return True
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): return False
[docs] def export_line_plot( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Tuple[pyplot.Figure]: """ :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="scalars") out = [] for t in tags: w_times, step_nums, vals = zip(*self.event_acc.Scalars(t)) fig, ax = pyplot.subplots(nrows=1, ncols=1) ax.plot(step_nums, vals) if self.save_to_disk: fig.savefig(str(out_dir / f"{t}_line_plot.png")) out.append(ax) return (*out,)
[docs] def export_image( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Tuple[Image]: """ :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="images") out = [] for t in tags: img = self.event_acc.Images(t) # TODO: CHECK VIDEO compatibility(gifs) if self.save_to_disk: with open(str(out_dir / f"{t}_img_{img.step}.png"), "wb") as f: f.write(img.encoded_image_string) out.append(Image.fromstring(img.encoded_image_string)) return (*out,)
[docs] def export_scalar( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Iterable: """ if save to files it pickles tags values with file ending .pkl :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="scalars") out = [] for t in tags: w_times, step_nums, vals = zip(*self.event_acc.Scalars(t)) if self.save_to_disk: with open(str(out_dir / f"{t}.pkl"), "wb") as f: dump(vals, f) out.append(vals) return (*out,)
[docs] def export_distribution(self, *tags: Iterable[str], out_dir: Path = Path.cwd()): """ :param tags: :param out_dir:""" self.tag_test(*tags, type_str="distributions") raise NotImplemented("not implemented yet!") out = [] for t in tags: vals = None # t out.append(vals) return (*out,)
[docs] def export_tensor( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Iterable: """ :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="tensors") out = [] for t in tags: w_times, step_nums, vals = zip(*self.event_acc.Tensors(t)) if self.save_to_disk: with open(str(out_dir / f"{t}.pkl"), "wb") as f: dump(vals, f) out.append(vals) return (*out,)
[docs] def export_graph( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Iterable: """ :param tags: :param out_dir: :return:""" out = [] w_times, step_nums, vals = zip(*self.event_acc.Graph()) if self.save_to_disk: with open(str(out_dir / f"graph.pkl"), "wb") as f: dump(vals, f) out.append(vals) return (*out,)
[docs] def export_audio( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Iterable: """ :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="audio") out = [] for t in tags: w_times, step_nums, vals = zip(*self.event_acc.Audio(t)) if self.save_to_disk: with open(str(out_dir / f"{t}.pkl"), "wb") as f: dump(vals, f) out.append(vals) return (*out,)
[docs] def export_histogram( self, *tags: Iterable[str], out_dir: Path = Path.cwd() ) -> Iterable: """ https://www.tensorflow.org/api_docs/python/tf/summary/histogram :param tags: :param out_dir: :return:""" self.tag_test(*tags, type_str="histograms") out = [] for t in tags: w_times, step_nums, vals = zip(*self.event_acc.Histograms(t)) if self.save_to_disk: with open(str(out_dir / f"{t}.pkl"), "wb") as f: dump(vals, f) out.append(vals) return (*out,)
[docs] @passes_kws_to(pandas.DataFrame.to_csv) def scalar_export_csv( self, *tags: Iterable[str], out_dir: Path = Path.cwd(), index_label: str = "epoch", **kwargs, ) -> Tuple[pandas.DataFrame]: """ size_guidance = 0 means all events, no aggregation or dropping :param index_label: :return: :param tags: :param out_dir:""" if not len(tags): print("No tags requested") # TODO: maybe just return # return tuple() self.tag_test(*tags, type_str="scalars") out = [] df = pandas.DataFrame( list( zip_longest( *[ list(zip_longest(*self.event_acc.Scalars(t), fillvalue=None))[ -1 ] for t in tags ], fillvalue=None, ) ), columns=tags, ) if self.save_to_disk: df.to_csv( str(out_dir / f'scalars_{"_".join(tags) if len(tags) else "none"}.csv'), columns=tags, index_label=index_label, **kwargs, ) out.append(df) return (*out,)
[docs] @passes_kws_to(pandas.DataFrame.to_csv) def pr_curve_export_csv( self, *tags: Iterable[str], out_dir: Path = Path.cwd(), index_label: str = "epoch", **kwargs, ) -> Tuple[pandas.DataFrame]: """ #TODO only supports a single step and tag for now size_guidance = 0 means all events, no aggregation or dropping :param index_label: :return: :param tags: :param out_dir:""" if not len(tags): print("No tags requested") # TODO: maybe just return # return tuple() self.tag_test(*tags, type_str="tensors") out = [] import tensorflow numpy_rep = numpy.array( [ tensorflow.make_ndarray(b) for a in list( zip_longest( *[ list( zip_longest(*self.event_acc.Tensors(t), fillvalue=None) )[-1] for t in tags ], fillvalue=None, ) ) for b in a ] ) labels = ( "true_positive_counts", "false_positive_counts", "true_negative_counts", "false_negative_counts", "precision", "recall", ) df = pandas.DataFrame( numpy_rep.tolist(), columns=[f"{t}_{l}" for t in tags for l in labels] ) if self.save_to_disk: df.to_csv( str(out_dir / f'tensors_{"_".join(tags) if len(tags) else "none"}.csv'), # columns=tags, index_label=index_label, **kwargs, ) out.append(df) return (*out,)
[docs] @passes_kws_to(pandas.DataFrame.to_csv) def tensor_export_csv( self, *tags: Iterable[str], out_dir: Path = Path.cwd(), index_label: str = "epoch", **kwargs, ) -> Tuple[pandas.DataFrame]: """ size_guidance = 0 means all events, no aggregation or dropping :param index_label: :return: :param tags: :param out_dir:""" if not len(tags): print("No tags requested") # TODO: maybe just return # return tuple() self.tag_test(*tags, type_str="tensors") out = [] import tensorflow numpy_rep = numpy.array( [ tensorflow.make_ndarray(b) for a in list( zip_longest( *[ list( zip_longest(*self.event_acc.Tensors(t), fillvalue=None) )[-1] for t in tags ], fillvalue=None, ) ) for b in a ] ) df = pandas.DataFrame([[numpy_rep.tolist()]], columns=tags) if self.save_to_disk: df.to_csv( str(out_dir / f'tensors_{"_".join(tags) if len(tags) else "none"}.csv'), # columns=tags, index_label=index_label, **kwargs, ) out.append(df) return (*out,)
if __name__ == "__main__": def a() -> None: """ :rtype: None """ _path_to_events_file = next( AppPath("Draugr", "Christian Heider Nielsen").user_log.rglob( "events.out.tfevents.*" ) ) print(_path_to_events_file) tee = TensorboardEventExporter(_path_to_events_file.parent, save_to_disk=True) print(tee.tags_available) # tee.export_csv('train_loss') # tee.export_line_plot('train_loss') # pyplot.show() print(tee.export_histogram()) print(tee.available_scalars) print( tee.pr_curve_export_csv( *tee.tags_available["tensors"], out_dir=ensure_existence(Path.cwd() / "exclude"), ) ) print(list(iter(tee.TagTypeEnum))) a()