# -*- encoding: utf-8 -*-
# @File : np_ops.py
# @Time : 2019-12-04
# @Author : zjh
r"""
Some common numpy operations
"""
__all__ = ("ndarray_index", "points_distance", "ellipse_kernel", "circle_kernel",
"nms_mask", "generate_grid", "seg2point", "seg2line")
import numpy as np
import cv2
[docs]def ndarray_index(shape):
"""
Create np.ndarray index
:param shape: index array shape
:type shape: Union[list, tuple]
:return: np.ndarray index
:rtype: np.ndarray
"""
idx = [np.arange(x) for x in shape]
if len(idx) >= 2:
idx[0], idx[1] = idx[1], idx[0]
idx = np.meshgrid(*idx)
if len(idx) >= 2:
idx[0], idx[1] = idx[1], idx[0]
return np.stack(idx, -1)
[docs]def points_distance(points0, points1, weights=(1., 1.)):
"""
Calculate distances between two point array
:param points0: a numpy array of shape (n, 2) representing the first set of points
:type points0: np.ndarray
:param points1: a numpy array of shape (m, 2) representing the second set of points
:type points1: np.ndarray
:param weights: a tuple of two floats representing the weights for the distance calculation
along the vertical and horizontal axes
:type weights: tuple
:return: a numpy array of shape (n, m) representing the distances between each pair of
points from points0 and points1
:rtype: np.ndarray
"""
points0 = np.reshape(points0, [-1, 1, 2])
points1 = np.reshape(points1, [1, -1, 2])
dist = ((points0 - points1) * np.asarray(weights)) ** 2
dist = np.sum(dist, axis=-1)
dist = np.sqrt(dist)
return dist
[docs]def ellipse_kernel(ksize, dtype=np.int32):
"""
Create ellipse kernel
:param ksize: kernel size, a tuple of (height, width)
:type ksize: tuple
:param dtype: data type of kernel
:type dtype: numpy.dtype, optional
:return: ellipse kernel
:rtype: numpy.ndarray
"""
assert len(ksize) == 2 and all(x > 0 for x in ksize)
indexs = ndarray_index(ksize) + 0.5
center = np.asarray(ksize) / 2
dis = points_distance(indexs, center, weights=[1, ksize[0] / ksize[1]])
dis = np.reshape(dis, indexs.shape[:-1])
kernel = np.zeros(ksize, dtype=dtype)
kernel[dis <= (ksize[0] - 1) / 2] = 1
return kernel
[docs]def circle_kernel(diameter, dtype=np.int32):
"""
Create circle kernel
:param diameter: diameter of the circle
:type diameter: int
:param dtype: data type of the kernel
:type dtype: numpy.dtype, optional
:return: circle kernel
:rtype: numpy.ndarray
"""
return ellipse_kernel([diameter, diameter], dtype)
[docs]def nms_mask(seg, ksize=3, dtype=None):
"""
Non-maximum suppression mask for segmentation
:param seg: segmentation result, probability map between [0.0, 1.0]
:type seg: np.ndarray
:param ksize: kernel size for max pooling
:type ksize: int
:param dtype: data type of output mask
:type dtype: np.dtype
:return: non-maximum suppression mask
:rtype: np.ndarray
"""
h, w = seg.shape
offset = ksize // 2
T = np.zeros([x + ksize for x in seg.shape], dtype=seg.dtype)
T[offset:offset + h, offset:offset + w] = seg
K = (T[:, i:i + w] for i in range(ksize))
K = np.stack(K, axis=-1)
max_x = np.max(K, axis=-1)
Q = (max_x[i:i + h, :] for i in range(ksize))
Q = np.stack(Q, axis=-1)
max_xy = np.max(Q, axis=-1)
# eq = np.less_equal(np.abs(seg - max_xy), 1e-8)
eq = np.equal(seg, max_xy)
if dtype:
mask = eq.astype(dtype)
else:
mask = eq.astype(seg.dtype)
return mask
[docs]def generate_grid(panel_size,
grid_size=None,
grid_num=None,
overlap_size=None,
overlap_ratio=None,
allow_cross_boundary=False):
"""
Generate grid for crop image patches
:param panel_size: a tuple of image size (height, width)
:type panel_size: tuple
:param grid_size: a tuple of grid size (grid_height, grid_width)
:type panel_size: tuple
:param grid_num: a tuple of grid num (grid_rows, grid_column)
:type panel_size: tuple
:param overlap_size: a tuple of overlap size
:type panel_size: tuple
:param overlap_ratio: a tuple of grid overlap ratio
:type panel_size: tuple
:param allow_cross_boundary: allow the last row or column position cross panel or not
:type panel_size: bool
:return: grids [rows, columns, 4], each grid consists (ymin, xmin, ymax, xmax)
:rtype: np.ndarray
"""
if grid_size is None and grid_num is None:
raise ValueError("param grid_size/grid_num must be set one")
elif grid_size is not None and grid_num is not None:
raise ValueError("param grid_size/grid_num must be set only one")
if overlap_size is None and overlap_ratio is None:
raise ValueError("param overlap_size/overlap_ratio must be set one")
elif overlap_size is not None and overlap_ratio is not None:
raise ValueError("param overlap_size/overlap_ratio must be set only one")
elif overlap_ratio is not None:
if not all(0 <= x < 1 for x in overlap_ratio):
raise ValueError("param overlap_ratio must between in [0, 1)")
elif overlap_size is not None and grid_size is not None:
if not all(0 <= x < y for x, y in zip(overlap_size, grid_size)):
raise ValueError("param overlap_size must be smaller thangrid_size ")
if grid_num and overlap_ratio:
rows, columns = grid_num
r = rows - (rows - 1) * overlap_ratio[0]
c = columns - (columns - 1) * overlap_ratio[1]
grid_size = panel_size[0] / r, panel_size[1] / c
overlap_size = grid_size[0] * overlap_ratio[0], grid_size[1] * overlap_ratio[1]
elif grid_num and overlap_size:
rows, columns = grid_num
grid_size = (panel_size[0] + (rows - 1) * overlap_size[0]) / rows, \
(panel_size[1] + (columns - 1) * overlap_size[1]) / columns
elif grid_size and overlap_ratio:
overlap_size = grid_size[0] * overlap_ratio[0], grid_size[1] * overlap_ratio[1]
# check grid param valid
grid_size = [int(round(x)) for x in grid_size]
assert all(x >= 1 for x in grid_size)
overlap_size = [int(round(x)) for x in overlap_size]
assert all(x >= 0 for x in overlap_size)
assert all(0 <= x < y for x, y in zip(overlap_size, grid_size))
ymin = np.arange(0, panel_size[0], grid_size[0] - overlap_size[0])
xmin = np.arange(0, panel_size[1], grid_size[1] - overlap_size[1])
xmin, ymin = np.meshgrid(xmin, ymin)
ymax, xmax = ymin + grid_size[0], xmin + grid_size[1]
grids = np.stack([ymin, xmin, ymax, xmax], axis=-1)
if allow_cross_boundary is False:
crs_idx = np.where(ymax[:, 0] > panel_size[0])[0]
if len(crs_idx) > 0:
grids = grids[:crs_idx[0], :]
crs_idx = np.where(xmax[0, :] > panel_size[1])[0]
if len(crs_idx) > 0:
grids = grids[:, :crs_idx[0]]
return grids
[docs]def seg2point(seg, max_diameter: int, min_distance, fb_threshold: float = 0.5,
min_fore_count: int = 1, max_fore_count: int = -1,
avg_fore_score: float = 0.55, distance_weights=(1., 1.)):
"""
Find all valid points from segmentation
:param seg: point object segmentation result, probability map between [0.0, 1.0]
:type seg: np.ndarray
:param max_diameter: max_radius for a point
:type max_diameter: int
:param min_distance: min distance between two point center
:type min_distance: float
:param fb_threshold: foreground vs background threshold
:type fb_threshold: float
:param min_fore_count: The minimum count of foreground pixel
:type min_fore_count: int
:param max_fore_count: The maximum count of foreground pixel
:type max_fore_count: int
:param avg_fore_score: The minimum average fore score of foreground
:type avg_fore_score: float
:param distance_weights: distance value's weights of vertical and horizontal
:type distance_weights: tuple
:return: A list of point
:rtype: np.ndarray
"""
c_kernel = circle_kernel(max_diameter, dtype=np.float)
seg_smt = cv2.blur(seg, ksize=(3, 3))
seg_smooth = cv2.blur(seg, ksize=(max_diameter + 1, max_diameter + 1))
nms_msk = nms_mask(seg_smooth, ksize=5, dtype=np.bool)
seg_frt = np.greater(seg, fb_threshold).astype(c_kernel.dtype)
count = cv2.filter2D(seg_frt, -1, c_kernel)
sel = np.logical_and(nms_msk,
np.greater_equal(count, min_fore_count))
sel = np.logical_and(sel,
np.greater_equal(seg_smt, avg_fore_score))
if max_fore_count > 0:
assert max_fore_count > min_fore_count, "max_fore_count(%s) must be bigger " \
"than min_fore_count(%s)" \
% (max_fore_count, min_fore_count)
sel = np.logical_and(sel, np.less_equal(count, max_fore_count))
scr = seg_smt[sel]
idc = ndarray_index(seg.shape)
idc = idc[sel]
candidates = zip(scr, idc)
candidates = sorted(candidates, key=lambda x: x[0], reverse=True)
points = np.zeros([0, 2], dtype=idc.dtype)
for s, p in candidates:
dis = points_distance(p, points, weights=distance_weights)[0]
if np.any(dis < min_distance):
continue
points = np.concatenate([points, [p]], axis=0)
return points
[docs]def seg2line(seg, fb_threshold=0.5, smooth_width=3, partition_width=20,
partition_height=30):
"""
Find all valid lines for segmentation
:param seg: point object segmentation result, probability map between [0.0, 1.0]
:type seg: np.ndarray
:param fb_threshold: foreground vs background threshold
:type fb_threshold: float
:param smooth_width: bin width for line x-coordinate smooth
:type smooth_width: float
:param partition_width: line will split when x-coordinate interval greater than partition_width
:type partition_width: float
:param partition_height: line will split when y-coordinate interval greater than partition_height
:type partition_height: float
:return: a list of lines
:rtype: list
"""
mask = np.greater(seg, fb_threshold)
mask = mask.astype(np.int32)
# solve vertical center
height, width = mask.shape
y_label = np.arange(height)
y_label = np.expand_dims(y_label, axis=1)
mask_label = np.sum(mask * y_label, axis=0)
mask_count = np.sum(mask, axis=0)
centers = []
for i in range(width):
st = max(0, i - int(smooth_width // 2))
ed = min(width, i + (smooth_width + 1) // 2)
if mask_count[i] <= 0:
continue
center_y = mask_label[st:ed].sum() / mask_count[st:ed].sum()
centers.append((i, center_y))
# partition lines
if len(centers) == 0:
return []
lines = [[centers[0]]]
for i in range(1, len(centers)):
cp, cc = centers[i - 1], centers[i]
# judge two point is near enough, connect near points
if cc[0] - cp[0] < partition_width and abs(cc[1] - cp[1]) < partition_height:
if cc[0] - cp[0] > 1:
inter_v = np.linspace(cp[1], cc[1], cc[0] - cp[0] + 1)
for k, v in enumerate(inter_v[1:-1]):
lines[-1].append((cp[0] + 1 + k, v))
else:
lines.append([])
lines[-1].append(centers[i])
return lines