libcamera: utils: Raspberry Pi Camera Tuning Tool

Initial implementation of the Raspberry Pi (BCM2835) Camera Tuning Tool.

All code is licensed under the BSD-2-Clause terms.
Copyright (c) 2019-2020 Raspberry Pi Trading Ltd.

Signed-off-by: Naushir Patuck <naush@raspberrypi.com>
Acked-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
Signed-off-by: Laurent Pinchart <laurent.pinchart@ideasonboard.com>
This commit is contained in:
Naushir Patuck 2020-05-03 16:49:53 +01:00 committed by Laurent Pinchart
parent 0db2c8dc75
commit c01cfe14f5
14 changed files with 3552 additions and 0 deletions

823
utils/raspberrypi/ctt/ctt.py Executable file
View file

@ -0,0 +1,823 @@
#!/usr/bin/env python3
#
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt.py - camera tuning tool
import os
import sys
from ctt_image_load import *
from ctt_ccm import *
from ctt_awb import *
from ctt_alsc import *
from ctt_lux import *
from ctt_noise import *
from ctt_geq import *
from ctt_pretty_print_json import *
import random
import json
import re
"""
This file houses the camera object, which is used to perform the calibrations.
The camera object houses all the calibration images as attributes in two lists:
- imgs (macbeth charts)
- imgs_alsc (alsc correction images)
Various calibrations are methods of the camera object, and the output is stored
in a dictionary called self.json.
Once all the caibration has been completed, the Camera.json is written into a
json file.
The camera object initialises its json dictionary by reading from a pre-written
blank json file. This has been done to avoid reproducing the entire json file
in the code here, thereby avoiding unecessary clutter.
"""
"""
Get the colour and lux values from the strings of each inidvidual image
"""
def get_col_lux(string):
"""
Extract colour and lux values from filename
"""
col = re.search('([0-9]+)[kK](\.(jpg|jpeg|brcm|dng)|_.*\.(jpg|jpeg|brcm|dng))$',string)
lux = re.search('([0-9]+)[lL](\.(jpg|jpeg|brcm|dng)|_.*\.(jpg|jpeg|brcm|dng))$',string)
try:
col = col.group(1)
except AttributeError:
"""
Catch error if images labelled incorrectly and pass reasonable defaults
"""
return None,None
try:
lux = lux.group(1)
except AttributeError:
"""
Catch error if images labelled incorrectly and pass reasonable defaults
Still returns colour if that has been found.
"""
return col,None
return int( col ),int( lux )
"""
Camera object that is the backbone of the tuning tool.
Input is the desired path of the output json.
"""
class Camera:
def __init__(self,jfile):
self.path = os.path.dirname(os.path.expanduser(__file__)) + '/'
if self.path == '/':
self.path = ''
self.imgs = []
self.imgs_alsc = []
self.log = 'Log created : '+ time.asctime(time.localtime(time.time()))
self.log_separator = '\n'+'-'*70+'\n'
self.jf = jfile
"""
initial json dict populated by uncalibrated values
"""
self.json = {
"rpi.black_level" : {
"black_level" : 4096
},
"rpi.dpc" : {
},
"rpi.lux" : {
"reference_shutter_speed": 10000,
"reference_gain": 1,
"reference_aperture": 1.0
},
"rpi.noise" : {
},
"rpi.geq" : {
},
"rpi.sdn": {
},
"rpi.awb": {
"priors" : [
{"lux": 0,"prior":[ 2000, 1.0, 3000, 0.0, 13000, 0.0]},
{"lux": 800,"prior":[ 2000, 0.0, 6000, 2.0, 13000, 2.0]},
{"lux": 1500,"prior":[ 2000, 0.0, 4000, 1.0, 6000, 6.0, 6500, 7.0, 7000, 1.0, 13000, 1.0]}
],
"modes" : {
"auto" : { "lo" : 2500, "hi" : 8000 },
"incandescent" : { "lo" : 2500, "hi" : 3000 },
"tungsten" : { "lo" : 3000, "hi" : 3500 },
"fluorescent" : { "lo" : 4000, "hi" : 4700 },
"indoor" : { "lo" : 3000, "hi" : 5000 },
"daylight" : { "lo" : 5500, "hi" : 6500 },
"cloudy" : { "lo" : 7000, "hi" : 8600 }
},
"bayes" : 1
},
"rpi.agc" : {
"metering_modes" : {
"centre-weighted" : {
"weights" : [3, 3, 3, 2, 2, 2, 2, 1, 1, 1, 1, 0, 0, 0, 0]
},
"spot" : {
"weights" : [2, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
},
"matrix": {
"weights" : [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
}
},
"exposure_modes" : {
"normal" : {
"shutter" : [100, 10000, 30000, 60000, 120000],
"gain" : [1.0, 2.0, 4.0, 6.0, 6.0]
},
"sport": {
"shutter": [ 100, 5000, 10000, 20000, 120000 ],
"gain": [ 1.0, 2.0, 4.0, 6.0, 6.0 ]
}
},
"constraint_modes" : {
"normal" : [
{"bound" : "LOWER", "q_lo" : 0.98, "q_hi" : 1.0, "y_target" : [0, 0.5, 1000, 0.5]}
],
"highlight": [
{ "bound": "LOWER", "q_lo": 0.98, "q_hi": 1.0, "y_target": [ 0, 0.5, 1000, 0.5 ] },
{ "bound": "UPPER", "q_lo": 0.98, "q_hi": 1.0, "y_target": [ 0, 0.8, 1000, 0.8 ] }
]
},
"y_target" : [0, 0.16, 1000, 0.165, 10000, 0.17]
},
"rpi.alsc": {
'omega' : 1.3,
'n_iter' : 100,
'luminance_strength' : 0.7,
},
"rpi.contrast" : {
"ce_enable": 1,
"gamma_curve": [
0, 0,
1024, 5040,
2048, 9338,
3072, 12356,
4096, 15312,
5120, 18051,
6144, 20790,
7168, 23193,
8192, 25744,
9216, 27942,
10240, 30035,
11264, 32005,
12288, 33975,
13312, 35815,
14336, 37600,
15360, 39168,
16384, 40642,
18432, 43379,
20480, 45749,
22528, 47753,
24576, 49621,
26624, 51253,
28672, 52698,
30720, 53796,
32768, 54876,
36864, 57012,
40960, 58656,
45056, 59954,
49152, 61183,
53248, 62355,
57344, 63419,
61440, 64476,
65535, 65535
]
},
"rpi.ccm": {
},
"rpi.sharpen":{
}
}
"""
Perform colour correction calibrations by comparing macbeth patch colours
to standard macbeth chart colours.
"""
def ccm_cal(self,do_alsc_colour):
if 'rpi.ccm' in self.disable:
return 1
print('\nStarting CCM calibration')
self.log_new_sec('CCM')
"""
if image is greyscale then CCm makes no sense
"""
if self.grey:
print('\nERROR: Can\'t do CCM on greyscale image!')
self.log += '\nERROR: Cannot perform CCM calibration '
self.log += 'on greyscale image!\nCCM aborted!'
del self.json['rpi.ccm']
return 0
a = time.time()
"""
Check if alsc tables have been generated, if not then do ccm without
alsc
"""
if (not "rpi.alsc" in self.disable) and do_alsc_colour:
"""
case where ALSC colour has been done, so no errors should be
expected...
"""
try:
cal_cr_list = self.json['rpi.alsc']['calibrations_Cr']
cal_cb_list = self.json['rpi.alsc']['calibrations_Cb']
self.log += '\nALSC tables found successfully'
except KeyError:
cal_cr_list,cal_cb_list=None,None
print('WARNING! No ALSC tables found for CCM!')
print('Performing CCM calibrations without ALSC correction...')
self.log += '\nWARNING: No ALSC tables found.\nCCM calibration '
self.log += 'performed without ALSC correction...'
else:
"""
case where config options result in CCM done without ALSC colour tables
"""
cal_cr_list,cal_cb_list=None,None
self.log += '\nWARNING: No ALSC tables found.\nCCM calibration '
self.log += 'performed without ALSC correction...'
"""
Do CCM calibration
"""
try:
ccms = ccm(self,cal_cr_list,cal_cb_list)
except ArithmeticError:
print('ERROR: Matrix is singular!\nTake new pictures and try again...')
self.log += '\nERROR: Singular matrix encountered during fit!'
self.log += '\nCCM aborted!'
return 1
"""
Write output to json
"""
self.json['rpi.ccm']['ccms'] = ccms
self.log += '\nCCM calibration written to json file'
print('Finished CCM calibration')
"""
Auto white balance calibration produces a colour curve for
various colour temperatures, as well as providing a maximum 'wiggle room'
distance from this curve (transverse_neg/pos).
"""
def awb_cal(self,greyworld,do_alsc_colour):
if 'rpi.awb' in self.disable:
return 1
print('\nStarting AWB calibration')
self.log_new_sec('AWB')
"""
if image is greyscale then AWB makes no sense
"""
if self.grey:
print('\nERROR: Can\'t do AWB on greyscale image!')
self.log += '\nERROR: Cannot perform AWB calibration '
self.log += 'on greyscale image!\nAWB aborted!'
del self.json['rpi.awb']
return 0
"""
optional set greyworld (e.g. for noir cameras)
"""
if greyworld:
self.json['rpi.awb']['bayes'] = 0
self.log += '\nGreyworld set'
"""
Check if alsc tables have been generated, if not then do awb without
alsc correction
"""
if (not "rpi.alsc" in self.disable) and do_alsc_colour:
try:
cal_cr_list = self.json['rpi.alsc']['calibrations_Cr']
cal_cb_list = self.json['rpi.alsc']['calibrations_Cb']
self.log += '\nALSC tables found successfully'
except KeyError:
cal_cr_list,cal_cb_list=None,None
print('ERROR, no ALSC calibrations found for AWB')
print('Performing AWB without ALSC tables')
self.log += '\nWARNING: No ALSC tables found.\nAWB calibration '
self.log += 'performed without ALSC correction...'
else:
cal_cr_list,cal_cb_list=None,None
self.log += '\nWARNING: No ALSC tables found.\nAWB calibration '
self.log += 'performed without ALSC correction...'
"""
call calibration function
"""
plot = "rpi.awb" in self.plot
awb_out = awb(self,cal_cr_list,cal_cb_list,plot)
ct_curve,transverse_neg,transverse_pos = awb_out
"""
write output to json
"""
self.json['rpi.awb']['ct_curve'] = ct_curve
self.json['rpi.awb']['sensitivity_r'] = 1.0
self.json['rpi.awb']['sensitivity_b'] = 1.0
self.json['rpi.awb']['transverse_pos'] = transverse_pos
self.json['rpi.awb']['transverse_neg'] = transverse_neg
self.log += '\nAWB calibration written to json file'
print('Finished AWB calibration')
"""
Auto lens shading correction completely mitigates the effects of lens shading for ech
colour channel seperately, and then partially corrects for vignetting.
The extent of the correction depends on the 'luminance_strength' parameter.
"""
def alsc_cal(self,luminance_strength,do_alsc_colour):
if 'rpi.alsc' in self.disable:
return 1
print('\nStarting ALSC calibration')
self.log_new_sec('ALSC')
"""
check if alsc images have been taken
"""
if len(self.imgs_alsc) == 0:
print('\nError:\nNo alsc calibration images found')
self.log += '\nERROR: No ALSC calibration images found!'
self.log += '\nALSC calibration aborted!'
return 1
self.json['rpi.alsc']['luminance_strength'] = luminance_strength
if self.grey and do_alsc_colour:
print('Greyscale camera so only luminance_lut calculated')
do_alsc_colour = False
self.log += '\nWARNING: ALSC colour correction cannot be done on '
self.log += 'greyscale image!\nALSC colour corrections forced off!'
"""
call calibration function
"""
plot = "rpi.alsc" in self.plot
alsc_out = alsc_all(self,do_alsc_colour,plot)
cal_cr_list,cal_cb_list,luminance_lut,av_corn = alsc_out
"""
write ouput to json and finish if not do_alsc_colour
"""
if not do_alsc_colour:
self.json['rpi.alsc']['luminance_lut'] = luminance_lut
self.json['rpi.alsc']['n_iter'] = 0
self.log += '\nALSC calibrations written to json file'
self.log += '\nNo colour calibrations performed'
print('Finished ALSC calibrations')
return 1
self.json['rpi.alsc']['calibrations_Cr'] = cal_cr_list
self.json['rpi.alsc']['calibrations_Cb'] = cal_cb_list
self.json['rpi.alsc']['luminance_lut'] = luminance_lut
self.log += '\nALSC colour and luminance tables written to json file'
"""
The sigmas determine the strength of the adaptive algorithm, that
cleans up any lens shading that has slipped through the alsc. These are
determined by measuring a 'worst-case' difference between two alsc tables
that are adjacent in colour space. If, however, only one colour
temperature has been provided, then this difference can not be computed
as only one table is available.
To determine the sigmas you would have to estimate the error of an alsc
table with only the image it was taken on as a check. To avoid circularity,
dfault exaggerated sigmas are used, which can result in too much alsc and
is therefore not advised.
In general, just take another alsc picture at another colour temperature!
"""
if len(self.imgs_alsc) == 1:
self.json['rpi.alsc']['sigma'] = 0.005
self.json['rpi.alsc']['sigma_Cb'] = 0.005
print('\nWarning:\nOnly one alsc calibration found' +
'\nStandard sigmas used for adaptive algorithm.')
print('Finished ALSC calibrations')
self.log += '\nWARNING: Only one colour temperature found in '
self.log += 'calibration images.\nStandard sigmas used for adaptive '
self.log += 'algorithm!'
return 1
"""
obtain worst-case scenario residual sigmas
"""
sigma_r,sigma_b = get_sigma(self,cal_cr_list,cal_cb_list)
"""
write output to json
"""
self.json['rpi.alsc']['sigma'] = np.round(sigma_r,5)
self.json['rpi.alsc']['sigma_Cb'] = np.round(sigma_b,5)
self.log += '\nCalibrated sigmas written to json file'
print('Finished ALSC calibrations')
"""
Green equalisation fixes problems caused by discrepancies in green
channels. This is done by measuring the effect on macbeth chart patches,
which ideally would have the same green values throughout.
An upper bound linear model is fit, fixing a threshold for the green
differences that are corrected.
"""
def geq_cal(self):
if 'rpi.geq' in self.disable:
return 1
print('\nStarting GEQ calibrations')
self.log_new_sec('GEQ')
"""
perform calibration
"""
plot = 'rpi.geq' in self.plot
slope,offset = geq_fit(self,plot)
"""
write output to json
"""
self.json['rpi.geq']['offset'] = offset
self.json['rpi.geq']['slope'] = slope
self.log += '\nGEQ calibrations written to json file'
print('Finished GEQ calibrations')
"""
Lux calibrations allow the lux level of a scene to be estimated by a ratio
calculation. Lux values are used in the pipeline for algorithms such as AGC
and AWB
"""
def lux_cal(self):
if 'rpi.lux' in self.disable:
return 1
print('\nStarting LUX calibrations')
self.log_new_sec('LUX')
"""
The lux calibration is done on a single image. For best effects, the
image with lux level closest to 1000 is chosen.
"""
luxes = [Img.lux for Img in self.imgs]
argmax = luxes.index(min(luxes, key=lambda l:abs(1000-l)))
Img = self.imgs[argmax]
self.log += '\nLux found closest to 1000: {} lx'.format(Img.lux)
self.log += '\nImage used: ' + Img.name
if Img.lux < 50:
self.log += '\nWARNING: Low lux could cause inaccurate calibrations!'
"""
do calibration
"""
lux_out,shutter_speed,gain = lux(self,Img)
"""
write output to json
"""
self.json['rpi.lux']['reference_shutter_speed'] = shutter_speed
self.json['rpi.lux']['reference_gain'] = gain
self.json['rpi.lux']['reference_lux'] = Img.lux
self.json['rpi.lux']['reference_Y'] = lux_out
self.log += '\nLUX calibrations written to json file'
print('Finished LUX calibrations')
"""
Noise alibration attempts to describe the noise profile of the sensor. The
calibration is run on macbeth images and the final output is taken as the average
"""
def noise_cal(self):
if 'rpi.noise' in self.disable:
return 1
print('\nStarting NOISE calibrations')
self.log_new_sec('NOISE')
"""
run calibration on all images and sort by slope.
"""
plot = "rpi.noise" in self.plot
noise_out = sorted([noise(self,Img,plot) for Img in self.imgs], key = lambda x:x[0])
self.log += '\nFinished processing images'
"""
take the average of the interquartile
"""
l = len(noise_out)
noise_out = np.mean(noise_out[l//4:1+3*l//4],axis=0)
self.log += '\nAverage noise profile: constant = {} '.format(int(noise_out[1]))
self.log += 'slope = {:.3f}'.format(noise_out[0])
"""
write to json
"""
self.json['rpi.noise']['reference_constant'] = int(noise_out[1])
self.json['rpi.noise']['reference_slope'] = round(noise_out[0],3)
self.log += '\nNOISE calibrations written to json'
print('Finished NOISE calibrations')
"""
Removes json entries that are turned off
"""
def json_remove(self,disable):
self.log_new_sec('Disabling Options',cal=False)
if len(self.disable) == 0:
self.log += '\nNothing disabled!'
return 1
for key in disable:
try:
del self.json[key]
self.log += '\nDisabled: '+key
except KeyError:
self.log += '\nERROR: '+key +' not found!'
"""
writes the json dictionary to the raw json file then make pretty
"""
def write_json(self):
"""
Write json dictionary to file
"""
jstring = json.dumps(self.json,sort_keys=False)
"""
make it pretty :)
"""
pretty_print_json(jstring,self.jf)
"""
add a new section to the log file
"""
def log_new_sec(self,section,cal=True):
self.log += '\n'+self.log_separator
self.log += section
if cal:
self.log += ' Calibration'
self.log += self.log_separator
"""
write script arguments to log file
"""
def log_user_input(self,json_output,directory,config,log_output):
self.log_new_sec('User Arguments',cal=False)
self.log += '\nJson file output: ' + json_output
self.log += '\nCalibration images directory: ' + directory
if config == None:
self.log += '\nNo configuration file input... using default options'
elif config == False:
self.log += '\nWARNING: Invalid configuration file path...'
self.log += ' using default options'
elif config == True:
self.log += '\nWARNING: Invalid syntax in configuration file...'
self.log += ' using default options'
else:
self.log += '\nConfiguration file: ' + config
if log_output == None:
self.log += '\nNo log file path input... using default: ctt_log.txt'
else:
self.log += '\nLog file output: ' + log_output
# if log_output
"""
write log file
"""
def write_log(self,filename):
if filename == None:
filename = 'ctt_log.txt'
self.log += '\n' + self.log_separator
with open(filename,'w') as logfile:
logfile.write(self.log)
"""
Add all images from directory, pass into relevant list of images and
extrace lux and temperature values.
"""
def add_imgs(self,directory,mac_config,blacklevel=-1):
self.log_new_sec('Image Loading',cal=False)
img_suc_msg = 'Image loaded successfully!'
print('\n\nLoading images from '+directory)
self.log += '\nDirectory: ' + directory
"""
get list of files
"""
filename_list = get_photos(directory)
print("Files found: {}".format(len(filename_list)))
self.log += '\nFiles found: {}'.format(len(filename_list))
"""
iterate over files
"""
filename_list.sort()
for filename in filename_list:
address = directory + filename
print('\nLoading image: '+filename)
self.log += '\n\nImage: ' + filename
"""
obtain colour and lux value
"""
col,lux = get_col_lux(filename)
"""
Check if image is an alsc calibration image
"""
if 'alsc' in filename:
Img = load_image(self,address,mac=False)
self.log += '\nIdentified as an ALSC image'
"""
check if imagae data has been successfully unpacked
"""
if Img == 0:
print('\nDISCARDED')
self.log += '\nImage discarded!'
continue
"""
check that image colour temperature has been successfuly obtained
"""
elif col != None:
"""
if successful, append to list and continue to next image
"""
Img.col = col
Img.name = filename
self.log += '\nColour temperature: {} K'.format(col)
self.imgs_alsc.append(Img)
if blacklevel != -1:
Img.blacklevel_16 = blacklevel
print(img_suc_msg)
continue
else:
print('Error! No colour temperature found!')
self.log += '\nWARNING: Error reading colour temperature'
self.log += '\nImage discarded!'
print('DISCARDED')
else:
self.log += '\nIdentified as macbeth chart image'
"""
if image isn't an alsc correction then it must have a lux and a
colour temperature value to be useful
"""
if lux == None:
print('DISCARDED')
self.log += '\nWARNING: Error reading lux value'
self.log += '\nImage discarded!'
continue
Img = load_image(self,address,mac_config)
"""
check that image data has been successfuly unpacked
"""
if Img == 0:
print('DISCARDED')
self.log += '\nImage discarded!'
continue
else:
"""
if successful, append to list and continue to next image
"""
Img.col,Img.lux = col,lux
Img.name = filename
self.log += '\nColour temperature: {} K'.format(col)
self.log += '\nLux value: {} lx'.format(lux)
if blacklevel != -1:
Img.blacklevel_16 = blacklevel
print(img_suc_msg)
self.imgs.append(Img)
print('\nFinished loading images')
"""
Check that usable images have been found
Possible errors include:
- no macbeth chart
- incorrect filename/extension
- images from different cameras
"""
def check_imgs(self):
self.log += '\n\nImages found:'
self.log += '\nMacbeth : {}'.format(len(self.imgs))
self.log += '\nALSC : {} '.format(len(self.imgs_alsc))
self.log += '\n\nCamera metadata'
"""
check usable images found
"""
if len(self.imgs) == 0:
print('\nERROR: No usable macbeth chart images found')
self.log += '\nERROR: No usable macbeth chart images found'
return 0
"""
Double check that every image has come from the same camera...
"""
all_imgs = self.imgs + self.imgs_alsc
camNames = list(set([Img.camName for Img in all_imgs]))
patterns = list(set([Img.pattern for Img in all_imgs]))
sigbitss = list(set([Img.sigbits for Img in all_imgs]))
blacklevels = list(set([Img.blacklevel_16 for Img in all_imgs]))
sizes = list(set([(Img.w,Img.h) for Img in all_imgs]))
if len(camNames)==1 and len(patterns)==1 and len(sigbitss)==1 and len(blacklevels) ==1 and len(sizes)== 1:
self.grey = (patterns[0] == 128)
self.blacklevel_16 = blacklevels[0]
self.log += '\nName: {}'.format(camNames[0])
self.log += '\nBayer pattern case: {}'.format(patterns[0])
if self.grey:
self.log += '\nGreyscale camera identified'
self.log += '\nSignificant bits: {}'.format(sigbitss[0])
self.log += '\nBlacklevel: {}'.format(blacklevels[0])
self.log += '\nImage size: w = {} h = {}'.format(sizes[0][0],sizes[0][1])
return 1
else:
print('\nERROR: Images from different cameras')
self.log += '\nERROR: Images are from different cameras'
return 0
def run_ctt(json_output,directory,config,log_output):
"""
check input files are jsons
"""
if json_output[-5:] != '.json':
raise ArgError('\n\nError: Output must be a json file!')
if config != None:
"""
check if config file is actually a json
"""
if config[-5:] != '.json':
raise ArgError('\n\nError: Config file must be a json file!')
"""
read configurations
"""
try:
with open(config,'r') as config_json:
configs = json.load(config_json)
except FileNotFoundError:
configs = {}
config = False
except json.decoder.JSONDecodeError:
configs = {}
config = True
else:
configs = {}
"""
load configurations from config file, if not given then set default
"""
disable = get_config(configs,"disable",[],'list')
plot = get_config(configs,"plot",[],'list')
awb_d = get_config(configs,"awb",{},'dict')
greyworld = get_config(awb_d,"greyworld",0,'bool')
alsc_d = get_config(configs,"alsc",{},'dict')
do_alsc_colour = get_config(alsc_d,"do_alsc_colour",1,'bool')
luminance_strength = get_config(alsc_d,"luminance_strength",0.5,'num')
blacklevel = get_config(configs,"blacklevel",-1,'num')
macbeth_d = get_config(configs,"macbeth",{},'dict')
mac_small = get_config(macbeth_d,"small",0,'bool')
mac_show = get_config(macbeth_d,"show",0,'bool')
mac_config = (mac_small,mac_show)
if blacklevel < -1 or blacklevel >= 2**16:
print('\nInvalid blacklevel, defaulted to 64')
blacklevel = -1
if luminance_strength < 0 or luminance_strength > 1:
print('\nInvalid luminance_strength strength, defaulted to 0.5')
luminance_strength = 0.5
"""
sanitise directory path
"""
if directory[-1] != '/':
directory += '/'
"""
initialise tuning tool and load images
"""
try:
Cam = Camera(json_output)
Cam.log_user_input(json_output,directory,config,log_output)
Cam.disable = disable
Cam.plot = plot
Cam.add_imgs(directory,mac_config,blacklevel)
except FileNotFoundError:
raise ArgError('\n\nError: Input image directory not found!')
"""
preform calibrations as long as check_imgs returns True
If alsc is activated then it must be done before awb and ccm since the alsc
tables are used in awb and ccm calibrations
ccm also technically does an awb but it measures this from the macbeth
chart in the image rather than using calibration data
"""
if Cam.check_imgs():
Cam.json['rpi.black_level']['black_level'] = Cam.blacklevel_16
Cam.json_remove(disable)
print('\nSTARTING CALIBRATIONS')
Cam.alsc_cal(luminance_strength,do_alsc_colour)
Cam.geq_cal()
Cam.lux_cal()
Cam.noise_cal()
Cam.awb_cal(greyworld,do_alsc_colour)
Cam.ccm_cal(do_alsc_colour)
print('\nFINISHED CALIBRATIONS')
Cam.write_json()
Cam.write_log(log_output)
print('\nCalibrations written to: '+json_output)
if log_output == None:
log_output = 'ctt_log.txt'
print('Log file written to: '+log_output)
pass
else:
Cam.write_log(log_output)
if __name__ == '__main__':
"""
initialise calibration
"""
if len(sys.argv) == 1:
print("""
Pisp Camera Tuning Tool version 1.0
Required Arguments:
'-i' : Calibration image directory.
'-o' : Name of output json file.
Optional Arguments:
'-c' : Config file for the CTT. If not passed, default parameters used.
'-l' : Name of output log file. If not passed, 'ctt_log.txt' used.
""")
quit(0)
else:
"""
parse input arguments
"""
json_output,directory,config,log_output = parse_input()
run_ctt(json_output,directory,config,log_output)

View file

@ -0,0 +1,297 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_alsc.py - camera tuning tool for ALSC (auto lens shading correction)
from ctt_image_load import *
import matplotlib.pyplot as plt
from matplotlib import cm
from mpl_toolkits.mplot3d import Axes3D
"""
preform alsc calibration on a set of images
"""
def alsc_all(Cam,do_alsc_colour,plot):
imgs_alsc = Cam.imgs_alsc
"""
create list of colour temperatures and associated calibration tables
"""
list_col = []
list_cr = []
list_cb = []
list_cg = []
for Img in imgs_alsc:
col,cr,cb,cg,size = alsc(Cam,Img,do_alsc_colour,plot)
list_col.append(col)
list_cr.append(cr)
list_cb.append(cb)
list_cg.append(cg)
Cam.log += '\n'
Cam.log += '\nFinished processing images'
w,h,dx,dy = size
Cam.log += '\nChannel dimensions: w = {} h = {}'.format(int(w),int(h))
Cam.log += '\n16x12 grid rectangle size: w = {} h = {}'.format(dx,dy)
"""
convert to numpy array for data manipulation
"""
list_col = np.array(list_col)
list_cr = np.array(list_cr)
list_cb = np.array(list_cb)
list_cg = np.array(list_cg)
cal_cr_list = []
cal_cb_list = []
"""
only do colour calculations if required
"""
if do_alsc_colour:
Cam.log += '\nALSC colour tables'
for ct in sorted(set(list_col)):
Cam.log += '\nColour temperature: {} K'.format(ct)
"""
average tables for the same colour temperature
"""
indices = np.where(list_col == ct)
ct = int(ct)
t_r = np.mean(list_cr[indices],axis=0)
t_b = np.mean(list_cb[indices],axis=0)
"""
force numbers to be stored to 3dp.... :(
"""
t_r = np.where((100*t_r)%1<=0.05, t_r+0.001,t_r)
t_b = np.where((100*t_b)%1<=0.05, t_b+0.001,t_b)
t_r = np.where((100*t_r)%1>=0.95, t_r-0.001,t_r)
t_b = np.where((100*t_b)%1>=0.95, t_b-0.001,t_b)
t_r = np.round(t_r,3)
t_b = np.round(t_b,3)
r_corners = (t_r[0],t_r[15],t_r[-1],t_r[-16])
b_corners = (t_b[0],t_b[15],t_b[-1],t_b[-16])
r_cen = t_r[5*16+7]+t_r[5*16+8]+t_r[6*16+7]+t_r[6*16+8]
r_cen = round(r_cen/4,3)
b_cen = t_b[5*16+7]+t_b[5*16+8]+t_b[6*16+7]+t_b[6*16+8]
b_cen = round(b_cen/4,3)
Cam.log += '\nRed table corners: {}'.format(r_corners)
Cam.log += '\nRed table centre: {}'.format(r_cen)
Cam.log += '\nBlue table corners: {}'.format(b_corners)
Cam.log += '\nBlue table centre: {}'.format(b_cen)
cr_dict = {
'ct':ct,
'table':list(t_r)
}
cb_dict = {
'ct':ct,
'table':list(t_b)
}
cal_cr_list.append(cr_dict)
cal_cb_list.append(cb_dict)
Cam.log += '\n'
else:
cal_cr_list,cal_cb_list = None,None
"""
average all values for luminance shading and return one table for all temperatures
"""
lum_lut = np.mean(list_cg,axis=0)
lum_lut = np.where((100*lum_lut)%1<=0.05,lum_lut+0.001,lum_lut)
lum_lut = np.where((100*lum_lut)%1>=0.95,lum_lut-0.001,lum_lut)
lum_lut = list(np.round(lum_lut,3))
"""
calculate average corner for lsc gain calculation further on
"""
corners = (lum_lut[0],lum_lut[15],lum_lut[-1],lum_lut[-16])
Cam.log += '\nLuminance table corners: {}'.format(corners)
l_cen = lum_lut[5*16+7]+lum_lut[5*16+8]+lum_lut[6*16+7]+lum_lut[6*16+8]
l_cen = round(l_cen/4,3)
Cam.log += '\nLuminance table centre: {}'.format(l_cen)
av_corn = np.sum(corners)/4
return cal_cr_list,cal_cb_list,lum_lut,av_corn
"""
calculate g/r and g/b for 32x32 points arranged in a grid for a single image
"""
def alsc(Cam,Img,do_alsc_colour,plot=False):
Cam.log += '\nProcessing image: ' + Img.name
"""
get channel in correct order
"""
channels = [Img.channels[i] for i in Img.order]
"""
calculate size of single rectangle.
-(-(w-1)//32) is a ceiling division. w-1 is to deal robustly with the case
where w is a multiple of 32.
"""
w,h = Img.w/2,Img.h/2
dx,dy = int(-(-(w-1)//16)),int(-(-(h-1)//12))
"""
average the green channels into one
"""
av_ch_g = np.mean((channels[1:2]),axis = 0)
if do_alsc_colour:
"""
obtain 16x12 grid of intensities for each channel and subtract black level
"""
g = get_16x12_grid(av_ch_g,dx,dy) - Img.blacklevel_16
r = get_16x12_grid(channels[0],dx,dy) - Img.blacklevel_16
b = get_16x12_grid(channels[3],dx,dy) - Img.blacklevel_16
"""
calculate ratios as 32 bit in order to be supported by medianBlur function
"""
cr = np.reshape(g/r,(12,16)).astype('float32')
cb = np.reshape(g/b,(12,16)).astype('float32')
cg = np.reshape(1/g,(12,16)).astype('float32')
"""
median blur to remove peaks and save as float 64
"""
cr = cv2.medianBlur(cr,3).astype('float64')
cb = cv2.medianBlur(cb,3).astype('float64')
cg = cv2.medianBlur(cg,3).astype('float64')
cg = cg/np.min(cg)
"""
debugging code showing 2D surface plot of vignetting. Quite useful for
for sanity check
"""
if plot:
hf = plt.figure(figsize=(8,8))
ha = hf.add_subplot(311, projection='3d')
"""
note Y is plotted as -Y so plot has same axes as image
"""
X,Y = np.meshgrid(range(16),range(12))
ha.plot_surface(X,-Y,cr,cmap=cm.coolwarm,linewidth=0)
ha.set_title('ALSC Plot\nImg: {}\n\ncr'.format(Img.str))
hb = hf.add_subplot(312, projection='3d')
hb.plot_surface(X,-Y,cb,cmap=cm.coolwarm,linewidth=0)
hb.set_title('cb')
hc = hf.add_subplot(313, projection='3d')
hc.plot_surface(X,-Y,cg,cmap=cm.coolwarm,linewidth=0)
hc.set_title('g')
# print(Img.str)
plt.show()
return Img.col,cr.flatten(),cb.flatten(),cg.flatten(),(w,h,dx,dy)
else:
"""
only perform calculations for luminance shading
"""
g = get_16x12_grid(av_ch_g,dx,dy) - Img.blacklevel_16
cg = np.reshape(1/g,(12,16)).astype('float32')
cg = cv2.medianBlur(cg,3).astype('float64')
cg = cg/np.min(cg)
if plot:
hf = plt.figure(figssize=(8,8))
ha = hf.add_subplot(1,1,1,projection='3d')
X,Y = np.meashgrid(range(16),range(12))
ha.plot_surface(X,-Y,cg,cmap=cm.coolwarm,linewidth=0)
ha.set_title('ALSC Plot (Luminance only!)\nImg: {}\n\ncg').format(Img.str)
plt.show()
return Img.col,None,None,cg.flatten(),(w,h,dx,dy)
"""
Compresses channel down to a 16x12 grid
"""
def get_16x12_grid(chan,dx,dy):
grid = []
"""
since left and bottom border will not necessarily have rectangles of
dimension dx x dy, the 32nd iteration has to be handled separately.
"""
for i in range(11):
for j in range(15):
grid.append(np.mean(chan[dy*i:dy*(1+i),dx*j:dx*(1+j)]))
grid.append(np.mean(chan[dy*i:dy*(1+i),15*dx:]))
for j in range(15):
grid.append(np.mean(chan[11*dy:,dx*j:dx*(1+j)]))
grid.append(np.mean(chan[11*dy:,15*dx:]))
"""
return as np.array, ready for further manipulation
"""
return np.array(grid)
"""
obtains sigmas for red and blue, effectively a measure of the 'error'
"""
def get_sigma(Cam,cal_cr_list,cal_cb_list):
Cam.log += '\nCalculating sigmas'
"""
provided colour alsc tables were generated for two different colour
temperatures sigma is calculated by comparing two calibration temperatures
adjacent in colour space
"""
"""
create list of colour temperatures
"""
cts = [cal['ct'] for cal in cal_cr_list]
# print(cts)
"""
calculate sigmas for each adjacent cts and return worst one
"""
sigma_rs = []
sigma_bs = []
for i in range(len(cts)-1):
sigma_rs.append(calc_sigma(cal_cr_list[i]['table'],cal_cr_list[i+1]['table']))
sigma_bs.append(calc_sigma(cal_cb_list[i]['table'],cal_cb_list[i+1]['table']))
Cam.log += '\nColour temperature interval {} - {} K'.format(cts[i],cts[i+1])
Cam.log += '\nSigma red: {}'.format(sigma_rs[-1])
Cam.log += '\nSigma blue: {}'.format(sigma_bs[-1])
"""
return maximum sigmas, not necessarily from the same colour temperature
interval
"""
sigma_r = max(sigma_rs) if sigma_rs else 0.005
sigma_b = max(sigma_bs) if sigma_bs else 0.005
Cam.log += '\nMaximum sigmas: Red = {} Blue = {}'.format(sigma_r, sigma_b)
# print(sigma_rs,sigma_bs)
# print(sigma_r,sigma_b)
return sigma_r,sigma_b
"""
calculate sigma from two adjacent gain tables
"""
def calc_sigma(g1,g2):
"""
reshape into 16x12 matrix
"""
g1 = np.reshape(g1,(12,16))
g2 = np.reshape(g2,(12,16))
"""
apply gains to gain table
"""
gg = g1/g2
if np.mean(gg) < 1:
gg = 1/gg
"""
for each internal patch, compute average difference between it and its 4
neighbours, then append to list
"""
diffs = []
for i in range(10):
for j in range(14):
"""
note indexing is incremented by 1 since all patches on borders are
not counted
"""
diff = np.abs(gg[i+1][j+1]-gg[i][j+1])
diff += np.abs(gg[i+1][j+1]-gg[i+2][j+1])
diff += np.abs(gg[i+1][j+1]-gg[i+1][j])
diff += np.abs(gg[i+1][j+1]-gg[i+1][j+2])
diffs.append(diff/4)
"""
return mean difference
"""
mean_diff = np.mean(diffs)
return(np.round(mean_diff,5))

View file

@ -0,0 +1,374 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_awb.py - camera tuning tool for AWB
from ctt_image_load import *
import matplotlib.pyplot as plt
from bisect import bisect_left
from scipy.optimize import fmin
"""
obtain piecewise linear approximation for colour curve
"""
def awb(Cam,cal_cr_list,cal_cb_list,plot):
imgs = Cam.imgs
"""
condense alsc calibration tables into one dictionary
"""
if cal_cr_list == None:
colour_cals = None
else:
colour_cals = {}
for cr,cb in zip(cal_cr_list,cal_cb_list):
cr_tab = cr['table']
cb_tab = cb['table']
"""
normalise tables so min value is 1
"""
cr_tab= cr_tab/np.min(cr_tab)
cb_tab= cb_tab/np.min(cb_tab)
colour_cals[cr['ct']] = [cr_tab,cb_tab]
"""
obtain data from greyscale macbeth patches
"""
rb_raw = []
rbs_hat = []
for Img in imgs:
Cam.log += '\nProcessing '+Img.name
"""
get greyscale patches with alsc applied if alsc enabled.
Note: if alsc is disabled then colour_cals will be set to None and the
function will just return the greyscale patches
"""
r_patchs,b_patchs,g_patchs = get_alsc_patches(Img,colour_cals)
"""
calculate ratio of r,b to g
"""
r_g = np.mean(r_patchs/g_patchs)
b_g = np.mean(b_patchs/g_patchs)
Cam.log += '\n r : {:.4f} b : {:.4f}'.format(r_g,b_g)
"""
The curve tends to be better behaved in so-called hatspace.
R,B,G represent the individual channels. The colour curve is plotted in
r,b space, where:
r = R/G
b = B/G
This will be referred to as dehatspace... (sorry)
Hatspace is defined as:
r_hat = R/(R+B+G)
b_hat = B/(R+B+G)
To convert from dehatspace to hastpace (hat operation):
r_hat = r/(1+r+b)
b_hat = b/(1+r+b)
To convert from hatspace to dehatspace (dehat operation):
r = r_hat/(1-r_hat-b_hat)
b = b_hat/(1-r_hat-b_hat)
Proof is left as an excercise to the reader...
Throughout the code, r and b are sometimes referred to as r_g and b_g
as a reminder that they are ratios
"""
r_g_hat = r_g/(1+r_g+b_g)
b_g_hat = b_g/(1+r_g+b_g)
Cam.log += '\n r_hat : {:.4f} b_hat : {:.4f}'.format(r_g_hat,b_g_hat)
rbs_hat.append((r_g_hat,b_g_hat,Img.col))
rb_raw.append((r_g,b_g))
Cam.log += '\n'
Cam.log += '\nFinished processing images'
"""
sort all lits simultaneously by r_hat
"""
rbs_zip = list(zip(rbs_hat,rb_raw))
rbs_zip.sort(key=lambda x:x[0][0])
rbs_hat,rb_raw = list(zip(*rbs_zip))
"""
unzip tuples ready for processing
"""
rbs_hat = list(zip(*rbs_hat))
rb_raw = list(zip(*rb_raw))
"""
fit quadratic fit to r_g hat and b_g_hat
"""
a,b,c = np.polyfit(rbs_hat[0],rbs_hat[1],2)
Cam.log += '\nFit quadratic curve in hatspace'
"""
the algorithm now approximates the shortest distance from each point to the
curve in dehatspace. Since the fit is done in hatspace, it is easier to
find the actual shortest distance in hatspace and use the projection back
into dehatspace as an overestimate.
The distance will be used for two things:
1) In the case that colour temperature does not strictly decrease with
increasing r/g, the closest point to the line will be chosen out of an
increasing pair of colours.
2) To calculate transverse negative an dpositive, the maximum positive
and negative distance from the line are chosen. This benefits from the
overestimate as the transverse pos/neg are upper bound values.
"""
"""
define fit function
"""
def f(x):
return a*x**2 + b*x + c
"""
iterate over points (R,B are x and y coordinates of points) and calculate
distance to line in dehatspace
"""
dists = []
for i, (R,B) in enumerate(zip(rbs_hat[0],rbs_hat[1])):
"""
define function to minimise as square distance between datapoint and
point on curve. Squaring is monotonic so minimising radius squared is
equivalent to minimising radius
"""
def f_min(x):
y = f(x)
return((x-R)**2+(y-B)**2)
"""
perform optimisation with scipy.optmisie.fmin
"""
x_hat = fmin(f_min,R,disp=0)[0]
y_hat = f(x_hat)
"""
dehat
"""
x = x_hat/(1-x_hat-y_hat)
y = y_hat/(1-x_hat-y_hat)
rr = R/(1-R-B)
bb = B/(1-R-B)
"""
calculate euclidean distance in dehatspace
"""
dist = ((x-rr)**2+(y-bb)**2)**0.5
"""
return negative if point is below the fit curve
"""
if (x+y) > (rr+bb):
dist *= -1
dists.append(dist)
Cam.log += '\nFound closest point on fit line to each point in dehatspace'
"""
calculate wiggle factors in awb. 10% added since this is an upper bound
"""
transverse_neg = - np.min(dists) * 1.1
transverse_pos = np.max(dists) * 1.1
Cam.log += '\nTransverse pos : {:.5f}'.format(transverse_pos)
Cam.log += '\nTransverse neg : {:.5f}'.format(transverse_neg)
"""
set minimum transverse wiggles to 0.1 .
Wiggle factors dictate how far off of the curve the algorithm searches. 0.1
is a suitable minimum that gives better results for lighting conditions not
within calibration dataset. Anything less will generalise poorly.
"""
if transverse_pos < 0.01:
transverse_pos = 0.01
Cam.log += '\nForced transverse pos to 0.01'
if transverse_neg < 0.01:
transverse_neg = 0.01
Cam.log += '\nForced transverse neg to 0.01'
"""
generate new b_hat values at each r_hat according to fit
"""
r_hat_fit = np.array(rbs_hat[0])
b_hat_fit = a*r_hat_fit**2 + b*r_hat_fit + c
"""
transform from hatspace to dehatspace
"""
r_fit = r_hat_fit/(1-r_hat_fit-b_hat_fit)
b_fit = b_hat_fit/(1-r_hat_fit-b_hat_fit)
c_fit = np.round(rbs_hat[2],0)
"""
round to 4dp
"""
r_fit = np.where((1000*r_fit)%1<=0.05,r_fit+0.0001,r_fit)
r_fit = np.where((1000*r_fit)%1>=0.95,r_fit-0.0001,r_fit)
b_fit = np.where((1000*b_fit)%1<=0.05,b_fit+0.0001,b_fit)
b_fit = np.where((1000*b_fit)%1>=0.95,b_fit-0.0001,b_fit)
r_fit = np.round(r_fit,4)
b_fit = np.round(b_fit,4)
"""
The following code ensures that colour temperature decreases with
increasing r/g
"""
"""
iterate backwards over list for easier indexing
"""
i = len(c_fit) - 1
while i > 0 :
if c_fit[i] > c_fit[i-1]:
Cam.log += '\nColour temperature increase found\n'
Cam.log += '{} K at r = {} to '.format(c_fit[i-1],r_fit[i-1])
Cam.log += '{} K at r = {}'.format(c_fit[i],r_fit[i])
"""
if colour temperature increases then discard point furthest from
the transformed fit (dehatspace)
"""
error_1 = abs(dists[i-1])
error_2 = abs(dists[i])
Cam.log += '\nDistances from fit:\n'
Cam.log += '{} K : {:.5f} , '.format(c_fit[i],error_1)
Cam.log += '{} K : {:.5f}'.format(c_fit[i-1],error_2)
"""
find bad index
note that in python false = 0 and true = 1
"""
bad = i - (error_1<error_2)
Cam.log += '\nPoint at {} K deleted as '.format(c_fit[bad])
Cam.log += 'it is furthest from fit'
"""
delete bad point
"""
r_fit = np.delete(r_fit,bad)
b_fit = np.delete(b_fit,bad)
c_fit = np.delete(c_fit,bad).astype(np.uint16)
"""
note that if a point has been discarded then the length has decreased
by one, meaning that decreasing the index by one will reassess the kept
point against the next point. It is therefore possible, in theory, for
two adjacent points to be discarded, although probably rare
"""
i -= 1
"""
return formatted ct curve, ordered by increasing colour temperature
"""
ct_curve = list(np.array(list(zip(b_fit,r_fit,c_fit))).flatten())[::-1]
Cam.log += '\nFinal CT curve:'
for i in range(len(ct_curve)//3):
j = 3*i
Cam.log += '\n ct: {} '.format(ct_curve[j])
Cam.log += ' r: {} '.format(ct_curve[j+1])
Cam.log += ' b: {} '.format(ct_curve[j+2])
"""
plotting code for debug
"""
if plot:
x = np.linspace(np.min(rbs_hat[0]),np.max(rbs_hat[0]),100)
y = a*x**2 + b*x + c
plt.subplot(2,1,1)
plt.title('hatspace')
plt.plot(rbs_hat[0],rbs_hat[1],ls='--',color='blue')
plt.plot(x,y,color='green',ls='-')
plt.scatter(rbs_hat[0],rbs_hat[1],color='red')
for i, ct in enumerate(rbs_hat[2]):
plt.annotate(str(ct),(rbs_hat[0][i],rbs_hat[1][i]))
plt.xlabel('$\hat{r}$')
plt.ylabel('$\hat{b}$')
"""
optional set axes equal to shortest distance so line really does
looks perpendicular and everybody is happy
"""
# ax = plt.gca()
# ax.set_aspect('equal')
plt.grid()
plt.subplot(2,1,2)
plt.title('dehatspace - indoors?')
plt.plot(r_fit,b_fit,color='blue')
plt.scatter(rb_raw[0],rb_raw[1],color='green')
plt.scatter(r_fit,b_fit,color='red')
for i,ct in enumerate(c_fit):
plt.annotate(str(ct),(r_fit[i],b_fit[i]))
plt.xlabel('$r$')
plt.ylabel('$b$')
"""
optional set axes equal to shortest distance so line really does
looks perpendicular and everybody is happy
"""
# ax = plt.gca()
# ax.set_aspect('equal')
plt.subplots_adjust(hspace=0.5)
plt.grid()
plt.show()
"""
end of plotting code
"""
return(ct_curve,np.round(transverse_pos,5),np.round(transverse_neg,5))
"""
obtain greyscale patches and perform alsc colour correction
"""
def get_alsc_patches(Img,colour_cals,grey=True):
"""
get patch centre coordinates, image colour and the actual
patches for each channel,remembering to subtract blacklevel
If grey then only greyscale patches considered
"""
if grey:
cen_coords = Img.cen_coords[3::4]
col = Img.col
patches = [np.array(Img.patches[i]) for i in Img.order]
r_patchs = patches[0][3::4] - Img.blacklevel_16
b_patchs = patches[3][3::4] - Img.blacklevel_16
"""
note two green channels are averages
"""
g_patchs = (patches[1][3::4]+patches[2][3::4])/2 - Img.blacklevel_16
else:
cen_coords = Img.cen_coords
col = Img.col
patches = [np.array(Img.patches[i]) for i in Img.order]
r_patchs = patches[0] - Img.blacklevel_16
b_patchs = patches[3] - Img.blacklevel_16
g_patchs = (patches[1]+patches[2])/2 - Img.blacklevel_16
if colour_cals == None:
return r_patchs,b_patchs,g_patchs
"""
find where image colour fits in alsc colour calibration tables
"""
cts = list(colour_cals.keys())
pos = bisect_left(cts,col)
"""
if img colour is below minimum or above maximum alsc calibration colour, simply
pick extreme closest to img colour
"""
if pos%(len(cts)) == 0:
"""
this works because -0 = 0 = first and -1 = last index
"""
col_tabs = np.array(colour_cals[cts[-pos//len(cts)]])
"""
else, perform linear interpolation between existing alsc colour
calibration tables
"""
else:
bef = cts[pos-1]
aft = cts[pos]
da = col-bef
db = aft-col
bef_tabs = np.array(colour_cals[bef])
aft_tabs = np.array(colour_cals[aft])
col_tabs = (bef_tabs*db + aft_tabs*da)/(da+db)
col_tabs = np.reshape(col_tabs,(2,12,16))
"""
calculate dx,dy used to calculate alsc table
"""
w,h = Img.w/2,Img.h/2
dx,dy = int(-(-(w-1)//16)),int(-(-(h-1)//12))
"""
make list of pairs of gains for each patch by selecting the correct value
in alsc colour calibration table
"""
patch_gains = []
for cen in cen_coords:
x,y = cen[0]//dx, cen[1]//dy
# We could probably do with some better spatial interpolation here?
col_gains = (col_tabs[0][y][x],col_tabs[1][y][x])
patch_gains.append(col_gains)
"""
multiply the r and b channels in each patch by the respective gain, finally
performing the alsc colour correction
"""
for i,gains in enumerate(patch_gains):
r_patchs[i] = r_patchs[i] * gains[0]
b_patchs[i] = b_patchs[i] * gains[1]
"""
return greyscale patches, g channel and correct r,b channels
"""
return r_patchs,b_patchs,g_patchs

View file

@ -0,0 +1,221 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_ccm.py - camera tuning tool for CCM (colour correction matrix)
from ctt_image_load import *
from ctt_awb import get_alsc_patches
"""
takes 8-bit macbeth chart values, degammas and returns 16 bit
"""
def degamma(x):
x = x / ((2**8)-1)
x = np.where(x < 0.04045, x/12.92, ((x+0.055)/1.055)**2.4)
x = x * ((2**16)-1)
return x
"""
FInds colour correction matrices for list of images
"""
def ccm(Cam,cal_cr_list,cal_cb_list):
imgs = Cam.imgs
"""
standard macbeth chart colour values
"""
m_rgb = np.array([ # these are in sRGB
[116, 81, 67], # dark skin
[199, 147, 129], # light skin
[91, 122, 156], # blue sky
[90, 108, 64], # foliage
[130, 128, 176], # blue flower
[92, 190, 172], # bluish green
[224, 124, 47], # orange
[68, 91,170], # purplish blue
[198, 82, 97], # moderate red
[94, 58, 106], # purple
[159, 189, 63], # yellow green
[230, 162, 39], # orange yellow
[35, 63, 147], # blue
[67, 149, 74], # green
[180, 49, 57], # red
[238, 198, 20], # yellow
[193, 84, 151], # magenta
[0, 136, 170], # cyan (goes out of gamut)
[245, 245, 243], # white 9.5
[200, 202, 202], # neutral 8
[161, 163, 163], # neutral 6.5
[121, 121, 122], # neutral 5
[82, 84, 86], # neutral 3.5
[49, 49, 51] # black 2
])
"""
convert reference colours from srgb to rgb
"""
m_srgb = degamma(m_rgb)
"""
reorder reference values to match how patches are ordered
"""
m_srgb = np.array([m_srgb[i::6] for i in range(6)]).reshape((24,3))
"""
reformat alsc correction tables or set colour_cals to None if alsc is
deactivated
"""
if cal_cr_list == None:
colour_cals = None
else:
colour_cals = {}
for cr,cb in zip(cal_cr_list,cal_cb_list):
cr_tab = cr['table']
cb_tab = cb['table']
"""
normalise tables so min value is 1
"""
cr_tab= cr_tab/np.min(cr_tab)
cb_tab= cb_tab/np.min(cb_tab)
colour_cals[cr['ct']] = [cr_tab,cb_tab]
"""
for each image, perform awb and alsc corrections.
Then calculate the colour correction matrix for that image, recording the
ccm and the colour tempertaure.
"""
ccm_tab = {}
for Img in imgs:
Cam.log += '\nProcessing image: ' + Img.name
"""
get macbeth patches with alsc applied if alsc enabled.
Note: if alsc is disabled then colour_cals will be set to None and no
the function will simply return the macbeth patches
"""
r,b,g = get_alsc_patches(Img,colour_cals,grey=False)
"""
do awb
Note: awb is done by measuring the macbeth chart in the image, rather
than from the awb calibration. This is done so the awb will be perfect
and the ccm matrices will be more accurate.
"""
r_greys,b_greys,g_greys = r[3::4],b[3::4],g[3::4]
r_g = np.mean(r_greys/g_greys)
b_g = np.mean(b_greys/g_greys)
r = r / r_g
b = b / b_g
"""
normalise brightness wrt reference macbeth colours and then average
each channel for each patch
"""
gain = np.mean(m_srgb)/np.mean((r,g,b))
Cam.log += '\nGain with respect to standard colours: {:.3f}'.format(gain)
r = np.mean(gain*r,axis=1)
b = np.mean(gain*b,axis=1)
g = np.mean(gain*g,axis=1)
"""
calculate ccm matrix
"""
ccm = do_ccm(r,g,b,m_srgb)
"""
if a ccm has already been calculated for that temperature then don't
overwrite but save both. They will then be averaged later on
"""
if Img.col in ccm_tab.keys():
ccm_tab[Img.col].append(ccm)
else:
ccm_tab[Img.col] = [ccm]
Cam.log += '\n'
Cam.log += '\nFinished processing images'
"""
average any ccms that share a colour temperature
"""
for k,v in ccm_tab.items():
tab = np.mean(v,axis=0)
tab = np.where((10000*tab)%1<=0.05,tab+0.00001,tab)
tab = np.where((10000*tab)%1>=0.95,tab-0.00001,tab)
ccm_tab[k] = list(np.round(tab,5))
Cam.log += '\nMatrix calculated for colour temperature of {} K'.format(k)
"""
return all ccms with respective colour temperature in the correct format,
sorted by their colour temperature
"""
sorted_ccms = sorted(ccm_tab.items(),key=lambda kv: kv[0])
ccms = []
for i in sorted_ccms:
ccms.append({
'ct' : i[0],
'ccm' : i[1]
})
return ccms
"""
calculates the ccm for an individual image.
ccms are calculate in rgb space, and are fit by hand. Although it is a 3x3
matrix, each row must add up to 1 in order to conserve greyness, simplifying
calculation.
Should you want to fit them in another space (e.g. LAB) we wish you the best of
luck and send us the code when you are done! :-)
"""
def do_ccm(r,g,b,m_srgb):
rb = r-b
gb = g-b
rb_2s = (rb*rb)
rb_gbs = (rb*gb)
gb_2s = (gb*gb)
r_rbs = ( rb * (m_srgb[...,0] - b) )
r_gbs = ( gb * (m_srgb[...,0] - b) )
g_rbs = ( rb * (m_srgb[...,1] - b) )
g_gbs = ( gb * (m_srgb[...,1] - b) )
b_rbs = ( rb * (m_srgb[...,2] - b) )
b_gbs = ( gb * (m_srgb[...,2] - b) )
"""
Obtain least squares fit
"""
rb_2 = np.sum(rb_2s)
gb_2 = np.sum(gb_2s)
rb_gb = np.sum(rb_gbs)
r_rb = np.sum(r_rbs)
r_gb = np.sum(r_gbs)
g_rb = np.sum(g_rbs)
g_gb = np.sum(g_gbs)
b_rb = np.sum(b_rbs)
b_gb = np.sum(b_gbs)
det = rb_2*gb_2 - rb_gb*rb_gb
"""
Raise error if matrix is singular...
This shouldn't really happen with real data but if it does just take new
pictures and try again, not much else to be done unfortunately...
"""
if det < 0.001:
raise ArithmeticError
r_a = (gb_2*r_rb - rb_gb*r_gb)/det
r_b = (rb_2*r_gb - rb_gb*r_rb)/det
"""
Last row can be calculated by knowing the sum must be 1
"""
r_c = 1 - r_a - r_b
g_a = (gb_2*g_rb - rb_gb*g_gb)/det
g_b = (rb_2*g_gb - rb_gb*g_rb)/det
g_c = 1 - g_a - g_b
b_a = (gb_2*b_rb - rb_gb*b_gb)/det
b_b = (rb_2*b_gb - rb_gb*b_rb)/det
b_c = 1 - b_a - b_b
"""
format ccm
"""
ccm = [r_a,r_b,r_c,g_a,g_b,g_c,b_a,b_b,b_c]
return ccm

View file

@ -0,0 +1,16 @@
{
"disable": [],
"plot": [],
"alsc": {
"do_alsc_colour": 1,
"luminance_strength": 0.5
},
"awb": {
"greyworld": 0
},
"blacklevel": -1,
"macbeth": {
"small": 0,
"show": 0
}
}

View file

@ -0,0 +1,179 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_geq.py - camera tuning tool for GEQ (green equalisation)
from ctt_tools import *
import matplotlib.pyplot as plt
import scipy.optimize as optimize
"""
Uses green differences in macbeth patches to fit green equalisation threshold
model. Ideally, all macbeth chart centres would fall below the threshold as
these should be corrected by geq.
"""
def geq_fit(Cam,plot):
imgs = Cam.imgs
"""
green equalisation to mitigate mazing.
Fits geq model by looking at difference
between greens in macbeth patches
"""
geqs = np.array([ geq(Cam,Img)*Img.againQ8_norm for Img in imgs ])
Cam.log += '\nProcessed all images'
geqs = geqs.reshape((-1,2))
"""
data is sorted by green difference and top half is selected since higher
green difference data define the decision boundary.
"""
geqs = np.array(sorted(geqs,key = lambda r:np.abs((r[1]-r[0])/r[0])))
length = len(geqs)
g0 = geqs[length//2:,0]
g1 = geqs[length//2:,1]
gdiff = np.abs(g0-g1)
"""
find linear fit by minimising asymmetric least square errors
in order to cover most of the macbeth images.
the philosophy here is that every macbeth patch should fall within the
threshold, hence the upper bound approach
"""
def f(params):
m,c = params
a = gdiff - (m*g0+c)
"""
asymmetric square error returns:
1.95 * a**2 if a is positive
0.05 * a**2 if a is negative
"""
return(np.sum(a**2+0.95*np.abs(a)*a))
initial_guess = [0.01,500]
"""
Nelder-Mead is usually not the most desirable optimisation method
but has been chosen here due to its robustness to undifferentiability
(is that a word?)
"""
result = optimize.minimize(f,initial_guess,method='Nelder-Mead')
"""
need to check if the fit worked correectly
"""
if result.success:
slope,offset = result.x
Cam.log += '\nFit result: slope = {:.5f} '.format(slope)
Cam.log += 'offset = {}'.format(int(offset))
"""
optional plotting code
"""
if plot:
x = np.linspace(max(g0)*1.1,100)
y = slope*x + offset
plt.title('GEQ Asymmetric \'Upper Bound\' Fit')
plt.plot(x,y,color='red',ls='--',label='fit')
plt.scatter(g0,gdiff,color='b',label='data')
plt.ylabel('Difference in green channels')
plt.xlabel('Green value')
"""
This upper bound asymmetric gives correct order of magnitude values.
The pipeline approximates a 1st derivative of a gaussian with some
linear piecewise functions, introducing arbitrary cutoffs. For
pessimistic geq, the model parameters have been increased by a
scaling factor/constant.
Feel free to tune these or edit the json files directly if you
belive there are still mazing effects left (threshold too low) or if you
think it is being overcorrected (threshold too high).
We have gone for a one size fits most approach that will produce
acceptable results in most applications.
"""
slope *= 1.5
offset += 201
Cam.log += '\nFit after correction factors: slope = {:.5f}'.format(slope)
Cam.log += ' offset = {}'.format(int(offset))
"""
clamp offset at 0 due to pipeline considerations
"""
if offset < 0:
Cam.log += '\nOffset raised to 0'
offset = 0
"""
optional plotting code
"""
if plot:
y2 = slope*x + offset
plt.plot(x,y2,color='green',ls='--',label='scaled fit')
plt.grid()
plt.legend()
plt.show()
"""
the case where for some reason the fit didn't work correctly
Transpose data and then least squares linear fit. Transposing data
makes it robust to many patches where green difference is the same
since they only contribute to one error minimisation, instead of dragging
the entire linear fit down.
"""
else:
print('\nError! Couldn\'t fit asymmetric lest squares')
print(result.message)
Cam.log += '\nWARNING: Asymmetric least squares fit failed! '
Cam.log += 'Standard fit used could possibly lead to worse results'
fit = np.polyfit(gdiff,g0,1)
offset,slope = -fit[1]/fit[0],1/fit[0]
Cam.log += '\nFit result: slope = {:.5f} '.format(slope)
Cam.log += 'offset = {}'.format(int(offset))
"""
optional plotting code
"""
if plot:
x = np.linspace(max(g0)*1.1,100)
y = slope*x + offset
plt.title('GEQ Linear Fit')
plt.plot(x,y,color='red',ls='--',label='fit')
plt.scatter(g0,gdiff,color='b',label='data')
plt.ylabel('Difference in green channels')
plt.xlabel('Green value')
"""
Scaling factors (see previous justification)
The model here will not be an upper bound so scaling factors have
been increased.
This method of deriving geq model parameters is extremely arbitrary
and undesirable.
"""
slope *= 2.5
offset += 301
Cam.log += '\nFit after correction factors: slope = {:.5f}'.format(slope)
Cam.log += ' offset = {}'.format(int(offset))
if offset < 0:
Cam.log += '\nOffset raised to 0'
offset = 0
"""
optional plotting code
"""
if plot:
y2 = slope*x + offset
plt.plot(x,y2,color='green',ls='--',label='scaled fit')
plt.legend()
plt.grid()
plt.show()
return round(slope,5),int(offset)
""""
Return green channels of macbeth patches
returns g0,g1 where
> g0 is green next to red
> g1 is green next to blue
"""
def geq(Cam,Img):
Cam.log += '\nProcessing image {}'.format(Img.name)
patches = [Img.patches[i] for i in Img.order][1:3]
g_patches = np.array([(np.mean(patches[0][i]),np.mean(patches[1][i])) for i in range(24)])
Cam.log += '\n'
return(g_patches)

View file

@ -0,0 +1,428 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019-2020, Raspberry Pi (Trading) Limited
#
# ctt_image_load.py - camera tuning tool image loading
from ctt_tools import *
from ctt_macbeth_locator import *
import json
import pyexiv2 as pyexif
import rawpy as raw
"""
Image class load image from raw data and extracts metadata.
Once image is extracted from data, it finds 24 16x16 patches for each
channel, centred at the macbeth chart squares
"""
class Image:
def __init__(self,buf):
self.buf = buf
self.patches = None
self.saturated = False
'''
obtain metadata from buffer
'''
def get_meta(self):
self.ver = ba_to_b(self.buf[4:5])
self.w = ba_to_b(self.buf[0xd0:0xd2])
self.h = ba_to_b(self.buf[0xd2:0xd4])
self.pad = ba_to_b(self.buf[0xd4:0xd6])
self.fmt = self.buf[0xf5]
self.sigbits = 2*self.fmt + 4
self.pattern = self.buf[0xf4]
self.exposure = ba_to_b(self.buf[0x90:0x94])
self.againQ8 = ba_to_b(self.buf[0x94:0x96])
self.againQ8_norm = self.againQ8/256
camName = self.buf[0x10:0x10+128]
camName_end = camName.find(0x00)
self.camName = self.buf[0x10:0x10+128][:camName_end].decode()
"""
Channel order depending on bayer pattern
"""
bayer_case = {
0 : (0,1,2,3), #red
1 : (2,0,3,1), #green next to red
2 : (3,2,1,0), #green next to blue
3 : (1,0,3,2), #blue
128 : (0,1,2,3) #arbitrary order for greyscale casw
}
self.order = bayer_case[self.pattern]
'''
manual blacklevel - not robust
'''
if 'ov5647' in self.camName:
self.blacklevel = 16
else:
self.blacklevel = 64
self.blacklevel_16 = self.blacklevel << (6)
return 1
'''
print metadata for debug
'''
def print_meta(self):
print('\nData:')
print(' ver = {}'.format(self.ver))
print(' w = {}'.format(self.w))
print(' h = {}'.format(self.h))
print(' pad = {}'.format(self.pad))
print(' fmt = {}'.format(self.fmt))
print(' sigbits = {}'.format(self.sigbits))
print(' pattern = {}'.format(self.pattern))
print(' exposure = {}'.format(self.exposure))
print(' againQ8 = {}'.format(self.againQ8))
print(' againQ8_norm = {}'.format(self.againQ8_norm))
print(' camName = {}'.format(self.camName))
print(' blacklevel = {}'.format(self.blacklevel))
print(' blacklevel_16 = {}'.format(self.blacklevel_16))
return 1
"""
get image from raw scanline data
"""
def get_image(self,raw):
self.dptr = []
"""
check if data is 10 or 12 bits
"""
if self.sigbits == 10:
"""
calc length of scanline
"""
lin_len = ((((((self.w+self.pad+3)>>2)) * 5)+31)>>5) * 32
"""
stack scan lines into matrix
"""
raw = np.array(raw).reshape(-1,lin_len).astype(np.int64)[:self.h,...]
"""
separate 5 bits in each package, stopping when w is satisfied
"""
ba0 = raw[...,0:5*((self.w+3)>>2):5]
ba1 = raw[...,1:5*((self.w+3)>>2):5]
ba2 = raw[...,2:5*((self.w+3)>>2):5]
ba3 = raw[...,3:5*((self.w+3)>>2):5]
ba4 = raw[...,4:5*((self.w+3)>>2):5]
"""
assemble 10 bit numbers
"""
ch0 = np.left_shift((np.left_shift(ba0,2) + (ba4%4)),6)
ch1 = np.left_shift((np.left_shift(ba1,2) + (np.right_shift(ba4,2)%4)),6)
ch2 = np.left_shift((np.left_shift(ba2,2) + (np.right_shift(ba4,4)%4)),6)
ch3 = np.left_shift((np.left_shift(ba3,2) + (np.right_shift(ba4,6)%4)),6)
"""
interleave bits
"""
mat = np.empty((self.h,self.w),dtype=ch0.dtype)
mat[...,0::4] = ch0
mat[...,1::4] = ch1
mat[...,2::4] = ch2
mat[...,3::4] = ch3
"""
There is som eleaking memory somewhere in the code. This code here
seemed to make things good enough that the code would run for
reasonable numbers of images, however this is techincally just a
workaround. (sorry)
"""
ba0,ba1,ba2,ba3,ba4 = None,None,None,None,None
del ba0,ba1,ba2,ba3,ba4
ch0,ch1,ch2,ch3 = None,None,None,None
del ch0,ch1,ch2,ch3
"""
same as before but 12 bit case
"""
elif self.sigbits == 12:
lin_len = ((((((self.w+self.pad+1)>>1)) * 3)+31)>>5) * 32
raw = np.array(raw).reshape(-1,lin_len).astype(np.int64)[:self.h,...]
ba0 = raw[...,0:3*((self.w+1)>>1):3]
ba1 = raw[...,1:3*((self.w+1)>>1):3]
ba2 = raw[...,2:3*((self.w+1)>>1):3]
ch0 = np.left_shift((np.left_shift(ba0,4) + ba2%16),4)
ch1 = np.left_shift((np.left_shift(ba1,4) + (np.right_shift(ba2,4))%16),4)
mat = np.empty((self.h,self.w),dtype=ch0.dtype)
mat[...,0::2] = ch0
mat[...,1::2] = ch1
else:
"""
data is neither 10 nor 12 or incorrect data
"""
print('ERROR: wrong bit format, only 10 or 12 bit supported')
return 0
"""
separate bayer channels
"""
c0 = mat[0::2,0::2]
c1 = mat[0::2,1::2]
c2 = mat[1::2,0::2]
c3 = mat[1::2,1::2]
self.channels = [c0,c1,c2,c3]
return 1
"""
obtain 16x16 patch centred at macbeth square centre for each channel
"""
def get_patches(self,cen_coords,size=16):
"""
obtain channel widths and heights
"""
ch_w,ch_h = self.w,self.h
cen_coords = list(np.array((cen_coords[0])).astype(np.int32))
self.cen_coords = cen_coords
"""
squares are ordered by stacking macbeth chart columns from
left to right. Some useful patch indices:
white = 3
black = 23
'reds' = 9,10
'blues' = 2,5,8,20,22
'greens' = 6,12,17
greyscale = 3,7,11,15,19,23
"""
all_patches = []
for ch in self.channels:
ch_patches = []
for cen in cen_coords:
'''
macbeth centre is placed at top left of central 2x2 patch
to account for rounding
Patch pixels are sorted by pixel brightness so spatial
information is lost.
'''
patch = ch[cen[1]-7:cen[1]+9,cen[0]-7:cen[0]+9].flatten()
patch.sort()
if patch[-5] == (2**self.sigbits-1)*2**(16-self.sigbits):
self.saturated = True
ch_patches.append(patch)
# print('\nNew Patch\n')
all_patches.append(ch_patches)
# print('\n\nNew Channel\n\n')
self.patches = all_patches
return 1
def brcm_load_image(Cam, im_str):
"""
Load image where raw data and metadata is in the BRCM format
"""
try:
"""
create byte array
"""
with open(im_str,'rb') as image:
f = image.read()
b = bytearray(f)
"""
return error if incorrect image address
"""
except FileNotFoundError:
print('\nERROR:\nInvalid image address')
Cam.log += '\nWARNING: Invalid image address'
return 0
"""
return error if problem reading file
"""
if f == None:
print('\nERROR:\nProblem reading file')
Cam.log += '\nWARNING: Problem readin file'
return 0
# print('\nLooking for EOI and BRCM header')
"""
find end of image followed by BRCM header by turning
bytearray into hex string and string matching with regexp
"""
start = -1
match = bytearray(b'\xff\xd9@BRCM')
match_str = binascii.hexlify(match)
b_str = binascii.hexlify(b)
"""
note index is divided by two to go from string to hex
"""
indices = [m.start()//2 for m in re.finditer(match_str,b_str)]
# print(indices)
try:
start = indices[0] + 3
except IndexError:
print('\nERROR:\nNo Broadcom header found')
Cam.log += '\nWARNING: No Broadcom header found!'
return 0
"""
extract data after header
"""
# print('\nExtracting data after header')
buf = b[start:start+32768]
Img = Image(buf)
Img.str = im_str
# print('Data found successfully')
"""
obtain metadata
"""
# print('\nReading metadata')
Img.get_meta()
Cam.log += '\nExposure : {} us'.format(Img.exposure)
Cam.log += '\nNormalised gain : {}'.format(Img.againQ8_norm)
# print('Metadata read successfully')
"""
obtain raw image data
"""
# print('\nObtaining raw image data')
raw = b[start+32768:]
Img.get_image(raw)
"""
delete raw to stop memory errors
"""
raw = None
del raw
# print('Raw image data obtained successfully')
return Img
def dng_load_image(Cam, im_str):
try:
Img = Image(None)
# RawPy doesn't load all the image tags that we need, so we use py3exiv2
metadata = pyexif.ImageMetadata(im_str)
metadata.read()
Img.ver = 100 # random value
Img.w = metadata['Exif.SubImage1.ImageWidth'].value
Img.pad = 0
Img.h = metadata['Exif.SubImage1.ImageLength'].value
white = metadata['Exif.SubImage1.WhiteLevel'].value
Img.sigbits = int(white).bit_length()
Img.fmt = (Img.sigbits - 4) // 2
Img.exposure = int(metadata['Exif.Photo.ExposureTime'].value*1000000)
Img.againQ8 = metadata['Exif.Photo.ISOSpeedRatings'].value*256/100
Img.againQ8_norm = Img.againQ8 / 256
Img.camName = metadata['Exif.Image.Model'].value
Img.blacklevel = int(metadata['Exif.SubImage1.BlackLevel'].value[0])
Img.blacklevel_16 = Img.blacklevel << (16 - Img.sigbits)
bayer_case = {
'0 1 1 2': (0, (0, 1, 2, 3)),
'1 2 0 1': (1, (2, 0, 3, 1)),
'2 1 1 0': (2, (3, 2, 1, 0)),
'1 0 2 1': (3, (1, 0, 3, 2))
}
cfa_pattern = metadata['Exif.SubImage1.CFAPattern'].value
Img.pattern = bayer_case[cfa_pattern][0]
Img.order = bayer_case[cfa_pattern][1]
# Now use RawPy tp get the raw Bayer pixels
raw_im = raw.imread(im_str)
raw_data = raw_im.raw_image
shift = 16 - Img.sigbits
c0 = np.left_shift(raw_data[0::2,0::2].astype(np.int64), shift)
c1 = np.left_shift(raw_data[0::2,1::2].astype(np.int64), shift)
c2 = np.left_shift(raw_data[1::2,0::2].astype(np.int64), shift)
c3 = np.left_shift(raw_data[1::2,1::2].astype(np.int64), shift)
Img.channels = [c0, c1, c2, c3]
except:
print("\nERROR: failed to load DNG file", im_str)
print("Either file does not exist or is incompatible")
Cam.log += '\nERROR: DNG file does not exist or is incompatible'
raise
return Img
'''
load image from file location and perform calibration
check correct filetype
mac boolean is true if image is expected to contain macbeth chart and false
if not (alsc images don't have macbeth charts)
'''
def load_image(Cam,im_str,mac_config=None,show=False,mac=True,show_meta=False):
"""
check image is correct filetype
"""
if '.jpg' in im_str or '.jpeg' in im_str or '.brcm' in im_str or '.dng' in im_str:
if '.dng' in im_str:
Img = dng_load_image(Cam, im_str)
else:
Img = brcm_load_image(Cam, im_str)
if show_meta:
Img.print_meta()
if mac:
"""
find macbeth centres, discarding images that are too dark or light
"""
av_chan = (np.mean(np.array(Img.channels),axis=0)/(2**16))
av_val = np.mean(av_chan)
# print(av_val)
if av_val < Img.blacklevel_16/(2**16)+1/64:
macbeth = None
print('\nError: Image too dark!')
Cam.log += '\nWARNING: Image too dark!'
else:
macbeth = find_macbeth(Cam,av_chan,mac_config)
"""
if no macbeth found return error
"""
if macbeth == None:
print('\nERROR: No macbeth chart found')
return 0
mac_cen_coords = macbeth[1]
# print('\nMacbeth centres located successfully')
"""
obtain image patches
"""
# print('\nObtaining image patches')
Img.get_patches(mac_cen_coords)
if Img.saturated:
print('\nERROR: Macbeth patches have saturated')
Cam.log += '\nWARNING: Macbeth patches have saturated!'
return 0
"""
clear memory
"""
Img.buf = None
del Img.buf
# print('Image patches obtained successfully')
"""
optional debug
"""
if show and __name__ == '__main__':
copy = sum(Img.channels)/2**18
copy = np.reshape(copy,(Img.h//2,Img.w//2)).astype(np.float64)
copy,_ = reshape(copy,800)
represent(copy)
return Img
"""
return error if incorrect filetype
"""
else:
# print('\nERROR:\nInvalid file extension')
return 0
"""
bytearray splice to number little endian
"""
def ba_to_b(b):
total = 0
for i in range(len(b)):
total += 256**i * b[i]
return total

View file

@ -0,0 +1,58 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_lux.py - camera tuning tool for lux level
from ctt_tools import *
"""
Find lux values from metadata and calculate Y
"""
def lux(Cam,Img):
shutter_speed = Img.exposure
gain = Img.againQ8_norm
aperture = 1
Cam.log += '\nShutter speed = {}'.format(shutter_speed)
Cam.log += '\nGain = {}'.format(gain)
Cam.log += '\nAperture = {}'.format(aperture)
patches = [Img.patches[i] for i in Img.order]
channels = [Img.channels[i] for i in Img.order]
return lux_calc(Cam,Img,patches,channels),shutter_speed,gain
"""
perform lux calibration on bayer channels
"""
def lux_calc(Cam,Img,patches,channels):
"""
find means color channels on grey patches
"""
ap_r = np.mean(patches[0][3::4])
ap_g = (np.mean(patches[1][3::4])+np.mean(patches[2][3::4]))/2
ap_b = np.mean(patches[3][3::4])
Cam.log += '\nAverage channel values on grey patches:'
Cam.log += '\nRed = {:.0f} Green = {:.0f} Blue = {:.0f}'.format(ap_r,ap_b,ap_g)
# print(ap_r,ap_g,ap_b)
"""
calculate channel gains
"""
gr = ap_g/ap_r
gb = ap_g/ap_b
Cam.log += '\nChannel gains: Red = {:.3f} Blue = {:.3f}'.format(gr,gb)
"""
find means color channels on image and scale by gain
note greens are averaged together (treated as one channel)
"""
a_r = np.mean(channels[0])*gr
a_g = (np.mean(channels[1])+np.mean(channels[2]))/2
a_b = np.mean(channels[3])*gb
Cam.log += '\nAverage channel values over entire image scaled by channel gains:'
Cam.log += '\nRed = {:.0f} Green = {:.0f} Blue = {:.0f}'.format(a_r,a_b,a_g)
# print(a_r,a_g,a_b)
"""
Calculate y with top row of yuv matrix
"""
y = 0.299*a_r + 0.587*a_g + 0.114*a_b
Cam.log += '\nY value calculated: {}'.format(int(y))
# print(y)
return int(y)

View file

@ -0,0 +1,748 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_macbeth_locator.py - camera tuning tool Macbeth chart locator
from ctt_ransac import *
from ctt_tools import *
import warnings
"""
NOTE: some custom functions have been used here to make the code more readable.
These are defined in tools.py if they are needed for reference.
"""
"""
Some inconsistencies between packages cause runtime warnings when running
the clustering algorithm. This catches these warnings so they don't flood the
output to the console
"""
def fxn():
warnings.warn("runtime",RuntimeWarning)
"""
Define the success message
"""
success_msg = 'Macbeth chart located successfully'
def find_macbeth(Cam,img,mac_config=(0,0)):
small_chart,show = mac_config
print('Locating macbeth chart')
Cam.log += '\nLocating macbeth chart'
"""
catch the warnings
"""
warnings.simplefilter("ignore")
fxn()
"""
Reference macbeth chart is created that will be correlated with the located
macbeth chart guess to produce a confidence value for the match.
"""
ref = cv2.imread(Cam.path +'ctt_ref.pgm',flags=cv2.IMREAD_GRAYSCALE)
ref_w = 120
ref_h = 80
rc1 = (0,0)
rc2 = (0,ref_h)
rc3 = (ref_w,ref_h)
rc4 = (ref_w,0)
ref_corns = np.array((rc1,rc2,rc3,rc4),np.float32)
ref_data = (ref,ref_w,ref_h,ref_corns)
"""
locate macbeth chart
"""
cor,mac,coords,msg = get_macbeth_chart(img,ref_data)
"""
following bits of code tries to fix common problems with simple
techniques.
If now or at any point the best correlation is of above 0.75, then
nothing more is tried as this is a high enough confidence to ensure
reliable macbeth square centre placement.
"""
"""
brighten image 2x
"""
if cor < 0.75:
a = 2
img_br = cv2.convertScaleAbs(img,alpha=a,beta=0)
cor_b,mac_b,coords_b,msg_b = get_macbeth_chart(img_br,ref_data)
if cor_b > cor:
cor,mac,coords,msg = cor_b,mac_b,coords_b,msg_b
"""
brighten image 4x
"""
if cor < 0.75:
a = 4
img_br = cv2.convertScaleAbs(img,alpha=a,beta=0)
cor_b,mac_b,coords_b,msg_b = get_macbeth_chart(img_br,ref_data)
if cor_b > cor:
cor,mac,coords,msg = cor_b,mac_b,coords_b,msg_b
"""
In case macbeth chart is too small, take a selection of the image and
attempt to locate macbeth chart within that. The scale increment is
root 2
"""
"""
These variables will be used to transform the found coordinates at smaller
scales back into the original. If ii is still -1 after this section that
means it was not successful
"""
ii = -1
w_best = 0
h_best = 0
d_best = 100
"""
d_best records the scale of the best match. Macbeth charts are only looked
for at one scale increment smaller than the current best match in order to avoid
unecessarily searching for macbeth charts at small scales.
If a macbeth chart ha already been found then set d_best to 0
"""
if cor != 0:
d_best = 0
"""
scale 3/2 (approx root2)
"""
if cor < 0.75:
imgs = []
"""
get size of image
"""
shape = list(img.shape[:2])
w,h = shape
"""
set dimensions of the subselection and the step along each axis between
selections
"""
w_sel = int(2*w/3)
h_sel = int(2*h/3)
w_inc = int(w/6)
h_inc = int(h/6)
"""
for each subselection, look for a macbeth chart
"""
for i in range(3):
for j in range(3):
w_s,h_s = i*w_inc,j*h_inc
img_sel = img[w_s:w_s+w_sel,h_s:h_s+h_sel]
cor_ij,mac_ij,coords_ij,msg_ij = get_macbeth_chart(img_sel,ref_data)
"""
if the correlation is better than the best then record the
scale and current subselection at which macbeth chart was
found. Also record the coordinates, macbeth chart and message.
"""
if cor_ij > cor:
cor = cor_ij
mac,coords,msg = mac_ij,coords_ij,msg_ij
ii,jj = i,j
w_best,h_best = w_inc,h_inc
d_best = 1
"""
scale 2
"""
if cor < 0.75:
imgs = []
shape = list(img.shape[:2])
w,h = shape
w_sel = int(w/2)
h_sel = int(h/2)
w_inc = int(w/8)
h_inc = int(h/8)
for i in range(5):
for j in range(5):
w_s,h_s = i*w_inc,j*h_inc
img_sel = img[w_s:w_s+w_sel,h_s:h_s+h_sel]
cor_ij,mac_ij,coords_ij,msg_ij = get_macbeth_chart(img_sel,ref_data)
if cor_ij > cor:
cor = cor_ij
mac,coords,msg = mac_ij,coords_ij,msg_ij
ii,jj = i,j
w_best,h_best = w_inc,h_inc
d_best = 2
"""
The following code checks for macbeth charts at even smaller scales. This
slows the code down significantly and has therefore been omitted by default,
however it is not unusably slow so might be useful if the macbeth chart
is too small to be picked up to by the current subselections.
Use this for macbeth charts with side lengths around 1/5 image dimensions
(and smaller...?) it is, however, recommended that macbeth charts take up as
large as possible a proportion of the image.
"""
if small_chart:
if cor < 0.75 and d_best > 1 :
imgs = []
shape = list(img.shape[:2])
w,h = shape
w_sel = int(w/3)
h_sel = int(h/3)
w_inc = int(w/12)
h_inc = int(h/12)
for i in range(9):
for j in range(9):
w_s,h_s = i*w_inc,j*h_inc
img_sel = img[w_s:w_s+w_sel,h_s:h_s+h_sel]
cor_ij,mac_ij,coords_ij,msg_ij = get_macbeth_chart(img_sel,ref_data)
if cor_ij > cor:
cor = cor_ij
mac,coords,msg = mac_ij,coords_ij,msg_ij
ii,jj = i,j
w_best,h_best = w_inc,h_inc
d_best = 3
if cor < 0.75 and d_best > 2:
imgs = []
shape = list(img.shape[:2])
w,h = shape
w_sel = int(w/4)
h_sel = int(h/4)
w_inc = int(w/16)
h_inc = int(h/16)
for i in range(13):
for j in range(13):
w_s,h_s = i*w_inc,j*h_inc
img_sel = img[w_s:w_s+w_sel,h_s:h_s+h_sel]
cor_ij,mac_ij,coords_ij,msg_ij = get_macbeth_chart(img_sel,ref_data)
if cor_ij > cor:
cor = cor_ij
mac,coords,msg = mac_ij,coords_ij,msg_ij
ii,jj = i,j
w_best,h_best = w_inc,h_inc
"""
Transform coordinates from subselection to original image
"""
if ii != -1:
for a in range(len(coords)):
for b in range(len(coords[a][0])):
coords[a][0][b][1] += ii*w_best
coords[a][0][b][0] += jj*h_best
"""
initialise coords_fit variable
"""
coords_fit = None
# print('correlation: {}'.format(cor))
"""
print error or success message
"""
print(msg)
Cam.log += '\n' + msg
if msg == success_msg:
coords_fit = coords
Cam.log += '\nMacbeth chart vertices:\n'
Cam.log += '{}'.format(2*np.round(coords_fit[0][0]),0)
"""
if correlation is lower than 0.75 there may be a risk of macbeth chart
corners not having been located properly. It might be worth running
with show set to true to check where the macbeth chart centres have
been located.
"""
print('Confidence: {:.3f}'.format(cor))
Cam.log += '\nConfidence: {:.3f}'.format(cor)
if cor < 0.75:
print('Caution: Low confidence guess!')
Cam.log += 'WARNING: Low confidence guess!'
# cv2.imshow('MacBeth',mac)
# represent(mac,'MacBeth chart')
"""
extract data from coords_fit and plot on original image
"""
if show and coords_fit != None:
copy = img.copy()
verts = coords_fit[0][0]
cents = coords_fit[1][0]
"""
draw circles at vertices of macbeth chart
"""
for vert in verts:
p = tuple(np.round(vert).astype(np.int32))
cv2.circle(copy,p,10,1,-1)
"""
draw circles at centres of squares
"""
for i in range(len(cents)):
cent = cents[i]
p = tuple(np.round(cent).astype(np.int32))
"""
draw black circle on white square, white circle on black square an
grey circle everywhere else.
"""
if i == 3:
cv2.circle(copy,p,8,0,-1)
elif i == 23:
cv2.circle(copy,p,8,1,-1)
else:
cv2.circle(copy,p,8,0.5,-1)
copy,_ = reshape(copy,400)
represent(copy)
return(coords_fit)
def get_macbeth_chart(img,ref_data):
"""
function returns coordinates of macbeth chart vertices and square centres,
along with an error/success message for debugging purposes. Additionally,
it scores the match with a confidence value.
Brief explanation of the macbeth chart locating algorithm:
- Find rectangles within image
- Take rectangles within percentage offset of median perimeter. The
assumption is that these will be the macbeth squares
- For each potential square, find the 24 possible macbeth centre locations
that would produce a square in that location
- Find clusters of potential macbeth chart centres to find the potential
macbeth centres with the most votes, i.e. the most likely ones
- For each potential macbeth centre, use the centres of the squares that
voted for it to find macbeth chart corners
- For each set of corners, transform the possible match into normalised
space and correlate with a reference chart to evaluate the match
- Select the highest correlation as the macbeth chart match, returning the
correlation as the confidence score
"""
"""
get reference macbeth chart data
"""
(ref,ref_w,ref_h,ref_corns) = ref_data
"""
the code will raise and catch a MacbethError in case of a problem, trying
to give some likely reasons why the problem occred, hence the try/except
"""
try:
"""
obtain image, convert to grayscale and normalise
"""
src = img
src,factor = reshape(src,200)
original=src.copy()
a = 125/np.average(src)
src_norm = cv2.convertScaleAbs(src,alpha=a,beta=0)
"""
This code checks if there are seperate colour channels. In the past the
macbeth locator ran on jpgs and this makes it robust to different
filetypes. Note that running it on a jpg has 4x the pixels of the
average bayer channel so coordinates must be doubled.
This is best done in img_load.py in the get_patches method. The
coordinates and image width,height must be divided by two if the
macbeth locator has been run on a demosaicked image.
"""
if len(src_norm.shape) == 3:
src_bw = cv2.cvtColor(src_norm,cv2.COLOR_BGR2GRAY)
else:
src_bw = src_norm
original_bw = src_bw.copy()
"""
obtain image edges
"""
sigma=2
src_bw = cv2.GaussianBlur(src_bw,(0,0),sigma)
t1,t2 = 50,100
edges = cv2.Canny(src_bw,t1,t2)
"""
dilate edges to prevent self-intersections in contours
"""
k_size = 2
kernel = np.ones((k_size,k_size))
its = 1
edges = cv2.dilate(edges,kernel,iterations=its)
"""
find Contours in image
"""
conts,_ = cv2.findContours(edges,
cv2.RETR_TREE,
cv2.CHAIN_APPROX_NONE)
if len(conts) == 0:
raise MacbethError(
'\nWARNING: No macbeth chart found!'
'\nNo contours found in image\n'
'Possible problems:\n'
'- Macbeth chart is too dark or bright\n'
'- Macbeth chart is occluded\n'
)
"""
find quadrilateral contours
"""
epsilon = 0.07
conts_per = []
for i in range(len(conts)):
per = cv2.arcLength(conts[i],True)
poly = cv2.approxPolyDP(conts[i],
epsilon*per,True)
if len(poly) == 4 and cv2.isContourConvex(poly):
conts_per.append((poly,per))
if len(conts_per) == 0:
raise MacbethError(
'\nWARNING: No macbeth chart found!'
'\nNo quadrilateral contours found'
'\nPossible problems:\n'
'- Macbeth chart is too dark or bright\n'
'- Macbeth chart is occluded\n'
'- Macbeth chart is out of camera plane\n'
)
"""
sort contours by perimeter and get perimeters within percent of median
"""
conts_per = sorted(conts_per,key=lambda x:x[1])
med_per = conts_per[int(len(conts_per)/2)][1]
side = med_per/4
perc = 0.1
med_low,med_high = med_per*(1-perc),med_per*(1+perc)
squares = []
for i in conts_per:
if med_low <= i[1] and med_high >= i[1]:
squares.append(i[0])
"""
obtain coordinates of nomralised macbeth and squares
"""
square_verts,mac_norm = get_square_verts(0.06)
"""
for each square guess, find 24 possible macbeth chart centres
"""
mac_mids = []
squares_raw = []
for i in range(len(squares)):
square = squares[i]
squares_raw.append(square)
"""
convert quads to rotated rectangles. This is required as the
'squares' are usually quite irregular quadrilaterls, so performing
a transform would result in exaggerated warping and inaccurate
macbeth chart centre placement
"""
rect = cv2.minAreaRect(square)
square = cv2.boxPoints(rect).astype(np.float32)
"""
reorder vertices to prevent 'hourglass shape'
"""
square = sorted(square,key=lambda x:x[0])
square_1 = sorted(square[:2],key=lambda x:x[1])
square_2 = sorted(square[2:],key=lambda x:-x[1])
square = np.array(np.concatenate((square_1,square_2)),np.float32)
square = np.reshape(square,(4,2)).astype(np.float32)
squares[i] = square
"""
find 24 possible macbeth chart centres by trasnforming normalised
macbeth square vertices onto candidate square vertices found in image
"""
for j in range(len(square_verts)):
verts = square_verts[j]
p_mat = cv2.getPerspectiveTransform(verts,square)
mac_guess = cv2.perspectiveTransform(mac_norm,p_mat)
mac_guess = np.round(mac_guess).astype(np.int32)
"""
keep only if candidate macbeth is within image border
(deprecated)
"""
in_border = True
# for p in mac_guess[0]:
# pptest = cv2.pointPolygonTest(
# img_con,
# tuple(p),
# False
# )
# if pptest == -1:
# in_border = False
# break
if in_border:
mac_mid = np.mean(mac_guess,
axis=1)
mac_mids.append([mac_mid,(i,j)])
if len(mac_mids) == 0:
raise MacbethError(
'\nWARNING: No macbeth chart found!'
'\nNo possible macbeth charts found within image'
'\nPossible problems:\n'
'- Part of the macbeth chart is outside the image\n'
'- Quadrilaterals in image background\n'
)
"""
reshape data
"""
for i in range(len(mac_mids)):
mac_mids[i][0] = mac_mids[i][0][0]
"""
find where midpoints cluster to identify most likely macbeth centres
"""
clustering = cluster.AgglomerativeClustering(
n_clusters=None,
compute_full_tree = True,
distance_threshold = side*2
)
mac_mids_list = [x[0] for x in mac_mids]
if len(mac_mids_list)==1:
"""
special case of only one valid centre found (probably not needed)
"""
clus_list = []
clus_list.append([mac_mids,len(mac_mids)])
else:
clustering.fit(mac_mids_list)
# try:
# clustering.fit(mac_mids_list)
# except RuntimeWarning as error:
# return(0,None,None,error)
"""
create list of all clusters
"""
clus_list = []
if clustering.n_clusters_ >1:
for i in range(clustering.labels_.max()+1):
indices = [j for j,x in enumerate(clustering.labels_) if x == i]
clus = []
for index in indices:
clus.append(mac_mids[index])
clus_list.append([clus,len(clus)])
clus_list.sort(key=lambda x:-x[1])
elif clustering.n_clusters_ == 1:
"""
special case of only one cluster found
"""
# print('only 1 cluster')
clus_list.append([mac_mids,len(mac_mids)])
else:
raise MacbethError(
'\nWARNING: No macebth chart found!'
'\nNo clusters found'
'\nPossible problems:\n'
'- NA\n'
)
"""
keep only clusters with enough votes
"""
clus_len_max = clus_list[0][1]
clus_tol= 0.7
for i in range(len(clus_list)):
if clus_list[i][1] < clus_len_max * clus_tol:
clus_list = clus_list[:i]
break
cent = np.mean(clus_list[i][0],axis=0)[0]
clus_list[i].append(cent)
"""
represent most popular cluster centroids
"""
# copy = original_bw.copy()
# copy = cv2.cvtColor(copy,cv2.COLOR_GRAY2RGB)
# copy = cv2.resize(copy,None,fx=2,fy=2)
# for clus in clus_list:
# centroid = tuple(2*np.round(clus[2]).astype(np.int32))
# cv2.circle(copy,centroid,7,(255,0,0),-1)
# cv2.circle(copy,centroid,2,(0,0,255),-1)
# represent(copy)
"""
get centres of each normalised square
"""
reference = get_square_centres(0.06)
"""
for each possible macbeth chart, transform image into
normalised space and find correlation with reference
"""
max_cor = 0
best_map = None
best_fit = None
best_cen_fit = None
best_ref_mat = None
for clus in clus_list:
clus = clus[0]
sq_cents = []
ref_cents = []
i_list = [p[1][0] for p in clus]
for point in clus:
i,j = point[1]
"""
remove any square that voted for two different points within
the same cluster. This causes the same point in the image to be
mapped to two different reference square centres, resulting in
a very distorted perspective transform since cv2.findHomography
simply minimises error.
This phenomenon is not particularly likely to occur due to the
enforced distance threshold in the clustering fit but it is
best to keep this in just in case.
"""
if i_list.count(i) == 1:
square = squares_raw[i]
sq_cent = np.mean(square,axis=0)
ref_cent = reference[j]
sq_cents.append(sq_cent)
ref_cents.append(ref_cent)
"""
At least three squares need to have voted for a centre in
order for a transform to be found
"""
if len(sq_cents) < 3:
raise MacbethError(
'\nWARNING: No macbeth chart found!'
'\nNot enough squares found'
'\nPossible problems:\n'
'- Macbeth chart is occluded\n'
'- Macbeth chart is too dark of bright\n'
)
ref_cents = np.array(ref_cents)
sq_cents = np.array(sq_cents)
"""
find best fit transform from normalised centres to image
"""
h_mat,mask = cv2.findHomography(ref_cents,sq_cents)
if 'None' in str(type(h_mat)):
raise MacbethError(
'\nERROR\n'
)
"""
transform normalised corners and centres into image space
"""
mac_fit = cv2.perspectiveTransform(mac_norm,h_mat)
mac_cen_fit = cv2.perspectiveTransform(np.array([reference]),h_mat)
"""
transform located corners into reference space
"""
ref_mat = cv2.getPerspectiveTransform(
mac_fit,
np.array([ref_corns])
)
map_to_ref = cv2.warpPerspective(
original_bw,ref_mat,
(ref_w,ref_h)
)
"""
normalise brigthness
"""
a = 125/np.average(map_to_ref)
map_to_ref = cv2.convertScaleAbs(map_to_ref,alpha=a,beta=0)
"""
find correlation with bw reference macbeth
"""
cor = correlate(map_to_ref,ref)
"""
keep only if best correlation
"""
if cor > max_cor:
max_cor = cor
best_map = map_to_ref
best_fit = mac_fit
best_cen_fit = mac_cen_fit
best_ref_mat = ref_mat
"""
rotate macbeth by pi and recorrelate in case macbeth chart is
upside-down
"""
mac_fit_inv = np.array(
([[mac_fit[0][2],mac_fit[0][3],
mac_fit[0][0],mac_fit[0][1]]])
)
mac_cen_fit_inv = np.flip(mac_cen_fit,axis=1)
ref_mat = cv2.getPerspectiveTransform(
mac_fit_inv,
np.array([ref_corns])
)
map_to_ref = cv2.warpPerspective(
original_bw,ref_mat,
(ref_w,ref_h)
)
a = 125/np.average(map_to_ref)
map_to_ref = cv2.convertScaleAbs(map_to_ref,alpha=a,beta=0)
cor = correlate(map_to_ref,ref)
if cor > max_cor:
max_cor = cor
best_map = map_to_ref
best_fit = mac_fit_inv
best_cen_fit = mac_cen_fit_inv
best_ref_mat = ref_mat
"""
Check best match is above threshold
"""
cor_thresh = 0.6
if max_cor < cor_thresh:
raise MacbethError(
'\nWARNING: Correlation too low'
'\nPossible problems:\n'
'- Bad lighting conditions\n'
'- Macbeth chart is occluded\n'
'- Background is too noisy\n'
'- Macbeth chart is out of camera plane\n'
)
"""
Following code is mostly representation for debugging purposes
"""
"""
draw macbeth corners and centres on image
"""
copy = original.copy()
copy = cv2.resize(original,None,fx=2,fy=2)
# print('correlation = {}'.format(round(max_cor,2)))
for point in best_fit[0]:
point = np.array(point,np.float32)
point = tuple(2*np.round(point).astype(np.int32))
cv2.circle(copy,point,4,(255,0,0),-1)
for point in best_cen_fit[0]:
point = np.array(point,np.float32)
point = tuple(2*np.round(point).astype(np.int32))
cv2.circle(copy,point,4,(0,0,255),-1)
copy = copy.copy()
cv2.circle(copy,point,4,(0,0,255),-1)
"""
represent coloured macbeth in reference space
"""
best_map_col = cv2.warpPerspective(
original,best_ref_mat,(ref_w,ref_h)
)
best_map_col = cv2.resize(
best_map_col,None,fx=4,fy=4
)
a = 125/np.average(best_map_col)
best_map_col_norm = cv2.convertScaleAbs(
best_map_col,alpha=a,beta=0
)
# cv2.imshow('Macbeth',best_map_col)
# represent(copy)
"""
rescale coordinates to original image size
"""
fit_coords = (best_fit/factor,best_cen_fit/factor)
return(max_cor,best_map_col_norm,fit_coords,success_msg)
"""
catch macbeth errors and continue with code
"""
except MacbethError as error:
return(0,None,None,error)

View file

@ -0,0 +1,123 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_noise.py - camera tuning tool noise calibration
from ctt_image_load import *
import matplotlib.pyplot as plt
"""
Find noise standard deviation and fit to model:
noise std = a + b*sqrt(pixel mean)
"""
def noise(Cam,Img,plot):
Cam.log += '\nProcessing image: {}'.format(Img.name)
stds = []
means = []
"""
iterate through macbeth square patches
"""
for ch_patches in Img.patches:
for patch in ch_patches:
"""
renormalise patch
"""
patch = np.array(patch)
patch = (patch-Img.blacklevel_16)/Img.againQ8_norm
std = np.std(patch)
mean = np.mean(patch)
stds.append(std)
means.append(mean)
"""
clean data and ensure all means are above 0
"""
stds = np.array(stds)
means = np.array(means)
means = np.clip(np.array(means),0,None)
sq_means = np.sqrt(means)
"""
least squares fit model
"""
fit = np.polyfit(sq_means,stds,1)
Cam.log += '\nBlack level = {}'.format(Img.blacklevel_16)
Cam.log += '\nNoise profile: offset = {}'.format(int(fit[1]))
Cam.log += ' slope = {:.3f}'.format(fit[0])
"""
remove any values further than std from the fit
anomalies most likely caused by:
> ucharacteristically noisy white patch
> saturation in the white patch
"""
fit_score = np.abs(stds - fit[0]*sq_means - fit[1])
fit_std = np.std(stds)
fit_score_norm = fit_score - fit_std
anom_ind = np.where(fit_score_norm > 1)
fit_score_norm.sort()
sq_means_clean = np.delete(sq_means,anom_ind)
stds_clean = np.delete(stds,anom_ind)
removed = len(stds) - len(stds_clean)
if removed != 0:
Cam.log += '\nIdentified and removed {} anomalies.'.format(removed)
Cam.log += '\nRecalculating fit'
"""
recalculate fit with outliers removed
"""
fit = np.polyfit(sq_means_clean,stds_clean,1)
Cam.log += '\nNoise profile: offset = {}'.format(int(fit[1]))
Cam.log += ' slope = {:.3f}'.format(fit[0])
"""
if fit const is < 0 then force through 0 by
dividing by sq_means and fitting poly order 0
"""
corrected = 0
if fit[1] < 0:
corrected = 1
ones = np.ones(len(means))
y_data = stds/sq_means
fit2 = np.polyfit(ones,y_data,0)
Cam.log += '\nOffset below zero. Fit recalculated with zero offset'
Cam.log += '\nNoise profile: offset = 0'
Cam.log += ' slope = {:.3f}'.format(fit2[0])
# print('new fit')
# print(fit2)
"""
plot fit for debug
"""
if plot:
x = np.arange(sq_means.max()//0.88)
fit_plot = x*fit[0] + fit[1]
plt.scatter(sq_means,stds,label='data',color='blue')
plt.scatter(sq_means[anom_ind],stds[anom_ind],color='orange',label='anomalies')
plt.plot(x,fit_plot,label='fit',color='red',ls=':')
if fit[1] < 0:
fit_plot_2 = x*fit2[0]
plt.plot(x,fit_plot_2,label='fit 0 intercept',color='green',ls='--')
plt.plot(0,0)
plt.title('Noise Plot\nImg: {}'.format(Img.str))
plt.legend(loc = 'upper left')
plt.xlabel('Sqrt Pixel Value')
plt.ylabel('Noise Standard Deviation')
plt.grid()
plt.show()
"""
End of plotting code
"""
"""
format output to include forced 0 constant
"""
Cam.log += '\n'
if corrected:
fit = [fit2[0],0]
return fit
else:
return fit

View file

@ -0,0 +1,70 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_pretty_print_json.py - camera tuning tool JSON formatter
"""
takes a collapsed json file and makes it more readable
"""
def process_file(string, fout, state):
for c in string:
process_char(c, fout, state)
def print_newline(fout, state):
fout.write('\n')
fout.write(' '*state["indent"]*4)
def process_char(c, fout, state):
if c == '{':
if not state["skipnewline"]: print_newline(fout, state)
fout.write(c)
state["indent"] += 1
print_newline(fout, state)
elif c == '}':
state["indent"] -= 1
print_newline(fout, state)
fout.write(c)
elif c == '[':
print_newline(fout, state)
fout.write(c)
state["indent"] += 1
print_newline(fout, state)
state["inarray"] = [True] + state["inarray"]
state["arraycount"] = [0] + state["arraycount"]
elif c == ']':
state["indent"] -= 1
print_newline(fout, state)
state["inarray"].pop(0)
state["arraycount"].pop(0)
fout.write(c)
elif c == ':':
fout.write(c)
fout.write(' ')
elif c == ' ':
pass
elif c == ',':
if not state["inarray"][0]:
fout.write(c)
fout.write(' ')
print_newline(fout, state)
else:
fout.write(c)
state["arraycount"][0] += 1
if state["arraycount"][0] == 16:
state["arraycount"][0] = 0
print_newline(fout, state)
else:
fout.write(' ')
else:
fout.write(c)
state["skipnewline"] = (c == '[')
def pretty_print_json(str_in, output_filename):
state = {"indent": 0, "inarray": [False], "arraycount": [], "skipnewline" : True}
with open(output_filename, "w") as fout:
process_file(str_in, fout, state)
if __name__ == '__main__':
pretty_print_json("../ctt/ref_json/final_imx477.json", "pretty.json")

View file

@ -0,0 +1,69 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_ransac.py - camera tuning tool RANSAC selector for Macbeth chart locator
import numpy as np
scale = 2
"""
constructs normalised macbeth chart corners for ransac algorithm
"""
def get_square_verts(c_err = 0.05,scale = scale):
"""
define macbeth chart corners
"""
b_bord_x,b_bord_y = scale*8.5,scale*13
s_bord = 6*scale
side = 41*scale
x_max = side*6 + 5*s_bord + 2*b_bord_x
y_max = side*4 + 3*s_bord + 2*b_bord_y
c1 = (0,0)
c2 = (0,y_max)
c3 = (x_max,y_max)
c4 = (x_max,0)
mac_norm = np.array((c1,c2,c3,c4),np.float32)
mac_norm = np.array([ mac_norm ])
square_verts = []
square_0 = np.array(((0,0),(0,side),
(side,side),(side,0)),np.float32)
offset_0 = np.array((b_bord_x,b_bord_y),np.float32)
c_off = side * c_err
offset_cont = np.array(((c_off,c_off),(c_off,-c_off),
(-c_off,-c_off),(-c_off,c_off)),np.float32)
square_0 += offset_0
square_0 += offset_cont
"""
define macbeth square corners
"""
for i in range(6):
shift_i = np.array(((i*side,0),(i*side,0),
(i*side,0),(i*side,0)),np.float32)
shift_bord =np.array(((i*s_bord,0),(i*s_bord,0),
(i*s_bord,0),(i*s_bord,0)),np.float32)
square_i = square_0 + shift_i + shift_bord
for j in range(4):
shift_j = np.array(((0,j*side),(0,j*side),
(0,j*side),(0,j*side)),np.float32)
shift_bord = np.array(((0,j*s_bord),
(0,j*s_bord),(0,j*s_bord),
(0,j*s_bord)),np.float32)
square_j = square_i + shift_j + shift_bord
square_verts.append(square_j)
# print('square_verts')
# print(square_verts)
return np.array(square_verts,np.float32),mac_norm
def get_square_centres(c_err = 0.05,scale=scale):
"""
define macbeth square centres
"""
verts,mac_norm = get_square_verts(c_err,scale=scale)
centres = np.mean(verts,axis = 1)
# print('centres')
# print(centres)
return np.array(centres,np.float32)

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,141 @@
# SPDX-License-Identifier: BSD-2-Clause
#
# Copyright (C) 2019, Raspberry Pi (Trading) Limited
#
# ctt_tools.py - camera tuning tool miscellaneous
import time
import re
import binascii
import os
import cv2
import numpy as np
import imutils
import sys
import matplotlib.pyplot as plt
from sklearn import cluster as cluster
from sklearn.neighbors.nearest_centroid import NearestCentroid as get_centroids
"""
This file contains some useful tools, the details of which aren't important to
understanding of the code. They ar collated here to attempt to improve code
readability in the main files.
"""
"""
obtain config values, unless it doesnt exist, in which case pick default
Furthermore, it can check if the input is the correct type
"""
def get_config(dictt,key,default,ttype):
try:
val = dictt[key]
if ttype == 'string':
val = str(val)
elif ttype == 'num':
if 'int' not in str(type(val)):
if 'float' not in str(type(val)):
raise ValueError
elif ttype == 'dict':
if type(val) != type(dictt):
raise ValueError
elif ttype == 'list':
if type(val) != type([]):
raise ValueError
elif ttype == 'bool':
ttype = int(bool(ttype))
else:
val = dictt[key]
except (KeyError, ValueError):
val = default
return val
"""
argument parser
"""
def parse_input():
arguments = sys.argv[1:]
if len(arguments)%2 != 0:
raise ArgError('\n\nERROR! Enter value for each arguent passed.')
params = arguments [0::2]
vals = arguments [1::2]
args_dict = dict(zip(params,vals))
json_output = get_config(args_dict,'-o',None,'string')
directory = get_config(args_dict,'-i',None,'string')
config = get_config(args_dict,'-c',None,'string')
log_path = get_config(args_dict,'-l',None,'string')
if directory == None:
raise ArgError('\n\nERROR! No input directory given.')
if json_output == None:
raise ArgError('\n\nERROR! No output json given.')
return json_output,directory,config,log_path
"""
custom arg and macbeth error class
"""
class ArgError(Exception):
pass
class MacbethError(Exception):
pass
"""
correlation function to quantify match
"""
def correlate(im1,im2):
f1 = im1.flatten()
f2 = im2.flatten()
cor = np.corrcoef(f1,f2)
return cor[0][1]
"""
get list of files from directory
"""
def get_photos(directory='photos'):
filename_list = []
for filename in os.listdir(directory):
if 'jp' in filename or '.dng' in filename:
filename_list.append(filename)
return filename_list
"""
display image for debugging... read at your own risk...
"""
def represent(img,name='image'):
# if type(img) == tuple or type(img) == list:
# for i in range(len(img)):
# name = 'image {}'.format(i)
# cv2.imshow(name,img[i])
# else:
# cv2.imshow(name,img)
# cv2.waitKey(0)
# cv2.destroyAllWindows()
# return 0
"""
code above displays using opencv, but this doesn't catch users pressing 'x'
with their mouse to close the window.... therefore matplotlib is used....
(thanks a lot opencv)
"""
grid = plt.GridSpec(22,1)
plt.subplot(grid[:19,0])
plt.imshow(img,cmap='gray')
plt.axis('off')
plt.subplot(grid[21,0])
plt.title('press \'q\' to continue')
plt.axis('off')
plt.show()
# f = plt.figure()
# ax = f.add_subplot(211)
# ax2 = f.add_subplot(122)
# ax.imshow(img,cmap='gray')
# ax.axis('off')
# ax2.set_figheight(2)
# ax2.title('press \'q\' to continue')
# ax2.axis('off')
# plt.show()
"""
reshape image to fixed width without distorting
returns image and scale factor
"""
def reshape(img,width):
factor = width/img.shape[0]
return cv2.resize(img,None,fx=factor,fy=factor),factor