import json
import logging
import os
from copy import deepcopy
import slugid
logger = logging.getLogger()
if "HIGLASS_PYTHON_DEBUG" in os.environ and os.environ["HIGLASS_PYTHON_DEBUG"]:
logger.setLevel(logging.DEBUG)
else:
logger.setLevel(logging.ERROR)
__all__ = ["Track", "CombinedTrack", "View", "ViewConf"]
_track_default_position = {
"2d-rectangle-domains": "center",
"bedlike": "top",
"horizontal-bar": "top",
"horizontal-chromosome-labels": "top",
"chromosome-labels": "top",
"horizontal-gene-annotations": "top",
"horizontal-heatmap": "top",
"horizontal-1d-heatmap": "top",
"horizontal-line": "top",
"horizontal-multivec": "top",
"bar": "top",
"chromosome-labels": "top",
"gene-annotations": "top",
"heatmap": "top",
"1d-heatmap": "top",
"line": "top",
"horizontal-multivec": "top",
"heatmap": "center",
"left-axis": "left",
"osm-tiles": "center",
"top-axis": "top",
"viewport-projection-center": "center",
"viewport-projection-horizontal": "top",
}
_datatype_default_track = {
"2d-rectangle-domains": "2d-rectangle-domains",
"bedlike": "bedlike",
"chromsizes": "horizontal-chromosome-labels",
"gene-annotations": "horizontal-gene-annotations",
"matrix": "heatmap",
"vector": "horizontal-bar",
"multivec": "horizontal-multivec",
}
class Component:
def __repr__(self):
return json.dumps(self.to_dict(), indent=2)
@classmethod
def from_dict(cls, conf):
raise NotImplementedError
def to_dict(self):
raise NotImplementedError
[docs]class Track(Component):
"""
Configure a Track
Parameters
----------
track_type : str
The type of track (e.g. 'heatmap', 'line')
position: str
The position of the track (e.g. 'top')
tileset : :class:`Tileset`
A Tileset being displayed in this track
file_url: str
An http accessible tileset file
filetype : str
The type of the remote tilesets (e.g. 'bigwig', 'cooler', etc...)
server : str, optional
The server name (usually just 'localhost')
height : int, optional
The height of the track (in pixels)
width : int, optional
The width of the track (in pixels)
options : dict, optional
The options to pass onto the track
"""
def __init__(
self,
track_type=None,
position=None,
tileset=None,
file_url=None,
filetype=None,
options=None,
**kwargs,
):
if track_type is None:
if "type" in kwargs:
track_type = kwargs.pop("type")
else:
raise ValueError("Track type is required.")
if not position:
position = _track_default_position[track_type]
self.position = position
self.tileset = tileset
# populate the actual config
self.conf = {"type": track_type}
if tileset is not None:
self.conf["tilesetUid"] = tileset.uuid
elif "tileset_uuid" in kwargs:
self.conf["tilesetUid"] = kwargs.pop("tileset_uuid")
elif file_url is not None and filetype is not None:
self.conf["fileUrl"] = file_url
self.conf["filetype"] = filetype
if options is None:
self.conf["options"] = {}
else:
self.conf["options"] = deepcopy(options)
self.conf.update(kwargs)
if "uid" not in self.conf:
self.conf["uid"] = slugid.nice()
@property
def uid(self):
return self.conf["uid"]
@property
def options(self):
return self.conf["options"]
@property
def type(self):
return self.conf["type"]
[docs] def change_attributes(self, **kwargs):
"""
Change an attribute of this track and return a new copy.
"""
conf = self.conf.copy()
conf.update(kwargs)
return self.__class__(
conf["type"], position=self.position, tileset=self.tileset, **conf
)
[docs] def change_options(self, **kwargs):
"""
Change one of the track's options in the viewconf
"""
options = self.conf["options"].copy()
options.update(kwargs)
return self.change_attributes(options=options)
def __add__(self, other):
"""Overload the + operator to create combined tracks."""
new_tracks = []
if self.conf["type"] == "combined":
# this is a combined track
for track in self.tracks:
new_tracks += [track.copy()]
else:
new_tracks += [self]
if other.conf["type"] == "combined":
for track in other.tracks:
new_tracks += [track.copy()]
else:
new_tracks += [other.copy()]
return CombinedTrack(new_tracks)
def __truediv__(self, other):
return DividedTrack(self, other)
@classmethod
def from_dict(cls, conf):
return cls(**conf)
def to_dict(self):
return self.conf.copy()
def copy(self):
return Track(**self.to_dict())
class DividedTrack(Track):
"""A track representing one tileset divided by another.
Only works with some tileset types.
"""
def __init__(self, numerator, denominator, *args, **kwargs):
"""This track is created using two tilesets.
Parameters
----------
numerator (tileset):
The tileset to be divided
denominator (tileset):
The tileset to divide by
"""
if numerator.conf["type"] != denominator.conf["type"]:
raise ValueError(
f"Different track types: {numerator.conf['type']}, {denominator.conf['type']}"
)
if json.dumps(numerator.conf["options"]) != json.dumps(
denominator.conf["options"]
):
logger.warn(
"Tracks have different options, so we're using the first track's"
)
numerator_server = numerator.conf["server"]
numerator_uuid = numerator.conf["tilesetUid"]
denominator_server = denominator.conf["server"]
denominator_uuid = denominator.conf["tilesetUid"]
track_type = numerator.conf["type"]
position = numerator.position
options = numerator.conf["options"]
height = numerator.conf["height"] if "height" in numerator.conf else None
data_config = {
"type": "divided",
"children": [
{"server": numerator_server, "tilesetUid": numerator_uuid},
{"server": denominator_server, "tilesetUid": denominator_uuid},
],
}
super().__init__(
data=data_config,
type=track_type,
position=position,
options=options,
height=height,
*args,
**kwargs,
)
def change_attributes(self, **kwargs):
"""
Change an attribute of this track and return a new copy.
"""
conf = self.conf.copy()
conf.update(kwargs)
return Track(conf["type"]).from_dict(conf)
class CombinedTrack(Track):
def __init__(self, tracks, position=None, height=None, **kwargs):
"""
The combined track contains multiple actual tracks as layers.
Parameters
----------
tracks: list
A list of Tracks to add
"""
self.tracks = tracks
self.tileset = None
# try to get the position from a subtrack
self.position = position
if position is None:
for track in tracks:
if track.position:
self.position = track.position
break
for track in tracks:
if track.conf["type"] == "viewport-projection":
track.conf["type"] = position_to_viewport_projection_type(self.position)
track.position = self.position
#
# if no height is specified try to infer it from
# the containing tracks
if not height:
for track in tracks:
if "height" in track.conf and track.conf["height"]:
if not height:
height = track.conf["height"]
else:
height = max(height, track.conf["height"])
if height:
self.height = height
self.conf = {"type": "combined", "height": height}
else:
self.conf = {"type": "combined"}
@classmethod
def from_dict(cls, conf):
if "contents" in conf:
conf = conf.copy()
contents = conf.pop("contents")
tracks = [Track.from_dict(track_conf) for track_conf in contents]
else:
tracks = []
return cls(tracks, **conf)
def to_dict(self):
conf = self.conf.copy()
conf["contents"] = [t.to_dict() for t in self.tracks]
return conf
[docs]class View(Component):
"""
Configure a View
Parameters
----------
tracks: []
A list of Tracks to include in this view
x: int
The position of this view on the grid
y: int
The position of this view on the grid
width: int
The width of this of view on a 12 unit grid
height: int
The height of the this view. The height is proportional
to the height of all the views present.
initialXDoamin: [int, int]
The initial x range of the view
initialYDomain: [int, int]
The initial y range of the view
uid: string
The uid of new view
"""
def __init__(
self,
tracks=[],
x=0,
y=0,
width=12,
height=6,
initialXDomain=None,
initialYDomain=None,
uid=None,
overlays=[],
chrominfo=None,
geneinfo=None,
):
if uid is None:
uid = slugid.nice()
self.uid = uid
self.conf = {
"uid": uid,
"tracks": {"top": [], "center": [], "left": [], "right": [], "bottom": []},
"layout": {"w": width, "h": height, "x": x, "y": y},
}
if initialXDomain is not None:
self.conf["initialXDomain"] = initialXDomain
if initialYDomain is not None:
self.conf["initialYDomain"] = initialYDomain
self._track_position = {}
# autcomplete and chrominfo are used to poplulate the
# genome position search box
if chrominfo:
self.add_chrominfo(chrominfo)
if geneinfo:
self.add_geneinfo(geneinfo)
for track in tracks:
if isinstance(track, (tuple, list)):
new_track = CombinedTrack(track)
self.add_track(new_track)
else:
self.add_track(track)
if not chrominfo:
if "genomePositionSearchBox" in self.conf:
del self.conf["genomePositionSearchBox"]
if "genomePositionSearchBoxVisible" in self.conf:
del self.conf["genomePositionSearchBoxVisible"]
for i, overlay in enumerate(overlays):
# The uids need to be unique so if no uid is available we need to
# define a unique uid before calling `self.add_overlay`.
overlay["uid"] = overlay.get("uid", "overlay-{}".format(i))
self.add_overlay(overlay)
@property
def tracks(self):
return list(self._track_position.keys())
[docs] def add_track(self, track, position=None):
"""
Add a track to a position.
Parameters
----------
track : :class:`Track`
Track to add.
position : {'top', 'bottom', 'center', 'left', 'right'}
Location of track on the view. If not provided, we look for an
assigned ``position`` attribute in ``track``. If it does not exist,
we fall back on a default position if the track type has one.
"""
if position is None:
if track.position is not None:
position = track.position
elif track.type in _track_default_position:
position = _track_default_position[track.type]
else:
raise ValueError("A track position is required.")
self._track_position[track] = position
def create_track(self, track_type, **kwargs):
if track_type == "combined":
klass = CombinedTrack
else:
klass = Track
position = kwargs.pop("position", None)
track = klass(track_type=track_type, **kwargs)
self.add_track(track, position)
return track
@classmethod
def from_dict(cls, conf):
layout = conf.get("layout", {})
self = cls(
x=layout.get("x", 0),
y=layout.get("y", 0),
width=layout.get("w", 12),
height=layout.get("h", 6),
initialXDomain=conf.get("initialXDomain", None),
initialYDomain=conf.get("initialYDomain", None),
uid=conf.get("uid", None),
overlays=conf.get("overlays", []),
)
if "genomePositionSearchBox" in conf:
self.conf["genomePositionSearchBox"] = json.loads(
json.dumps(conf["genomePositionSearchBox"])
)
for position in conf.get("tracks", {}):
for track_conf in conf["tracks"][position]:
if track_conf["type"] == "combined":
klass = CombinedTrack
else:
klass = Track
# position has to be passed in as part of the parameter
# array so that the constructor can be called with it as
# a parameter
self.add_track(
track=klass.from_dict({"position": position, **track_conf})
)
return self
[docs] def to_dict(self):
"""
Convert the existing track to a JSON representation.
"""
conf = json.loads(json.dumps(self.conf))
for track, position in self._track_position.items():
conf["tracks"][position].append(track.to_dict())
return conf
def add_overlay(self, overlay):
if "overlays" not in self.conf:
self.conf["overlays"] = []
try:
options = overlay.get("options", {})
overlay_conf = {
"uid": overlay.get("uid", "overlay"),
"includes": overlay.get("includes", []),
"type": overlay.get("type", ""),
"options": {"extent": overlay.get("extent", [])},
}
overlay_conf["options"].update(options)
self.conf["overlays"].append(overlay_conf)
except KeyError:
pass
def add_geneinfo(self, track):
self.conf["genomePositionSearchBoxVisible"] = True
if "genomePositionSearchBox" in self.conf:
gpsb = self.conf["genomePositionSearchBox"]
else:
gpsb = {}
gpsb["autocompleteServer"] = track.conf["server"]
gpsb["autocompleteId"] = track.conf["tilesetUid"]
gpsb["visible"] = True
self.conf["genomePositionSearchBox"] = gpsb
def add_chrominfo(self, track):
self.conf["genomePositionSearchBoxVisible"] = True
if "genomePositionSearchBox" in self.conf:
gpsb = self.conf["genomePositionSearchBox"]
else:
gpsb = {}
gpsb["chromInfoId"] = track.conf["tilesetUid"]
gpsb["chromInfoServer"] = track.conf["server"]
gpsb["visible"] = True
self.conf["genomePositionSearchBox"] = gpsb
[docs]class ViewConf(Component):
"""Configure a dashboard"""
def __init__(
self, views=[], location_syncs=[], value_scale_syncs=[], zoom_syncs=[]
):
"""A Python representation of a HiGlass viewconf.
Parameters
----------
views: list[list]
A list of View objects which compose the viewable scene
location_syncs: list[list]
A list of lists of Views to be synced
value_scale_syncs: list[list]
A list containing the value scale syncs. Each sync can be
one of:
1. a list of (View, Track) tuples
2. a list of Tracks (assumes that there is only one view)
3. a list of strings of the form "{viewUid}.{trackUid}"
location_syncs: list[list]
A list of lists of Views to be synced
"""
self.conf = {
"editable": True,
"views": [],
"trackSourceServers": ["http://higlass.io/api/v1"],
"locationLocks": {"locksByViewUid": {}, "locksDict": {}},
"valueScaleLocks": {"locksByViewUid": {}, "locksDict": {}},
"zoomLocks": {"locksByViewUid": {}, "locksDict": {}},
"exportViewUrl": "http://higlass.io/api/v1/viewconfs",
}
self._views_by_id = {}
for view in views:
self.add_view(view)
for location_sync in location_syncs:
self.add_location_sync(location_sync)
for value_scale_sync in value_scale_syncs:
self.add_value_scale_sync(value_scale_sync)
for zoom_sync in zoom_syncs:
self.add_zoom_sync(zoom_sync)
@property
def views(self):
return list(self._views_by_id.values())
@property
def default_view(self):
"""Default view of the view config
The default view equals the first view if only one view exist.
Returns:
View -- View instance or ``None`` if more than one view exists.
"""
if len(self.views) == 1:
return self.views[0]
return None
def _combine_view_track_uid(self, view_uid, track_uid):
return f"{view_uid}.{track_uid}"
def _extract_view_track_uids(self, definition):
if isinstance(definition, tuple):
# definition is a tuple of a view and a track instance
view_uid = definition[0].uid
track_uid = definition[1].uid
elif isinstance(definition, Track):
# definition is a track instance which assumes that only one view
# exists
track_uid = definition.uid
elif isinstance(definition, str):
# definition is a string
uids = definition.split(".")
if len(uids) == 2:
view_uid, track_uid = uids
else:
track_uid = uids[0]
else:
logger.warning("Could not extract view and track UID")
track_uid = None
view_uid = None
if self.default_view is not None:
logger.info("Default view is used")
view_uid = self.default_view.uid
return view_uid, track_uid
def _add_sync(self, lock_group, lock_id, view_uids):
for view_uid in view_uids:
if lock_id not in self.conf[lock_group]["locksDict"]:
self.conf[lock_group]["locksDict"][lock_id] = {}
self.conf[lock_group]["locksDict"][lock_id][view_uid] = (1, 1, 1)
self.conf[lock_group]["locksByViewUid"][view_uid] = lock_id
def add_zoom_sync(self, views_to_sync):
lock_id = slugid.nice()
# TODO: check that view already exists in viewconf
self._add_sync("zoomLocks", lock_id, [v.uid for v in views_to_sync])
def add_location_sync(self, views_to_sync):
lock_id = slugid.nice()
# TODO: check that view already exists in viewconf
self._add_sync("locationLocks", lock_id, [v.uid for v in views_to_sync])
def add_value_scale_sync(self, tracks_to_sync):
locks_map = self.conf["valueScaleLocks"]["locksByViewUid"]
locks_dict = self.conf["valueScaleLocks"]["locksDict"]
lock_id = slugid.nice()
for definition in tracks_to_sync:
v_uid, t_uid = self._extract_view_track_uids(definition)
if v_uid is None or t_uid is None:
# If no view UID is found the definition seems to be broken
logger.warning("View or track definition is broken")
continue
vt_uid = self._combine_view_track_uid(v_uid, t_uid)
if lock_id not in locks_dict:
locks_dict[lock_id] = {}
locks_dict[lock_id][vt_uid] = {"view": v_uid, "track": t_uid}
locks_map[vt_uid] = lock_id
[docs] def add_view(self, view):
"""
Add a new view
Parameters
----------
view: :class:`View`
View object to add
"""
for uid in self._views_by_id.keys():
if uid == view.uid:
raise ValueError("View with this uid already exists")
self._views_by_id[view.uid] = view
def create_view(self, *args, **kwargs):
view = View(*args, **kwargs)
self.add_view(view)
return view
@classmethod
def from_link(cls, url):
from urllib.parse import urlsplit, urlunsplit, parse_qs
import requests
# parts: 'scheme://netloc/path?query#fragment'
parts = urlsplit(url)
query = parse_qs(parts.query)
if parts.path.strip("/") == "app":
if "config" not in query:
raise ValueError("Viewconf ID not found in query")
conf_id = query["config"][0]
elif parts.path.strip("/") in ("api/v1/viewconfs", "l", "link"):
if "d" not in query:
raise ValueError("Viewconf ID not found in query")
conf_id = query["d"][0]
else:
raise ValueError("Not a valid viewconf server")
endpoint = urlunsplit(
(parts.scheme, parts.netloc, "api/v1/viewconfs", "d=" + conf_id, "")
)
conf = requests.get(endpoint).json()
return cls.from_dict(conf)
@classmethod
def from_dict(cls, conf):
self = cls()
for view_dct in conf.get("views", []):
self.add_view(View.from_dict(view_dct))
locks = conf.get("locationLocks", {}).get("locksDict", {})
for lock_id, attrs in locks.items():
self._add_sync("locationLocks", lock_id, attrs.keys())
locks = conf.get("valueScaleLocks", {}).get("locksDict", {})
for lock_id, attrs in locks.items():
self._add_sync("valueScaleLocks", lock_id, attrs.keys())
locks = conf.get("zoomLocks", {}).get("locksDict", {})
for lock, attrs in locks.items():
self._add_sync("zoomLocks", lock_id, attrs.keys())
return self
def to_dict(self):
conf = json.loads(json.dumps(self.conf))
for view in self.views:
conf["views"].append(view.to_dict())
return conf
def tracktype_default_position(tracktype: str):
"""
Get the default track position for a track type.
For example, default position for a heatmap is 'center'.
If the provided track type has no known default position
return None.
Parameters
----------
tracktype: str
The track type to check
Returns
-------
str:
The default position
"""
if tracktype in _track_default_position:
return _track_default_position[tracktype]
return None
def datatype_to_tracktype(datatype):
"""
Infer a default track type from a data type. There can
be other track types that can display a given data type.
Parameters
----------
datatype: str
A datatype identifier (e.g. 'matrix')
Returns
-------
str, str:
A track type (e.g. 'heatmap') and position (e.g. 'top')
"""
track_type = _datatype_default_track.get(datatype, None)
position = _track_default_position.get(track_type, None)
return track_type, position
def position_to_viewport_projection_type(position):
if position == "center":
track_type = "viewport-projection-center"
elif position == "top" or position == "bottom":
track_type = "viewport-projection-horizontal"
elif position == "left" or position == "right":
track_type = "viewport-projection-vertical"
else:
track_type = "viewport-projection"
return track_type
class ViewportProjection(Track):
def __init__(self, view, position=None, options=None):
self.position = position
track_type = position_to_viewport_projection_type(position)
self.conf = {"type": track_type, "fromViewUid": view.uid}
self.view = view
if "uid" not in self.conf:
self.conf["uid"] = slugid.nice()
if options is None:
self.conf["options"] = {}
else:
self.conf["options"] = deepcopy(options)
def copy(self):
"""Copy this track."""
return ViewportProjection(self.view, self.position, self.conf["options"])