123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- #! /usr/bin/env python
- """
- Reads Darknet config and weights and creates Keras model with TF backend.
- """
- import argparse
- import configparser
- import io
- import os
- from collections import defaultdict
- import numpy as np
- from keras import backend as K
- from keras.layers import (Conv2D, Input, ZeroPadding2D, Add,
- UpSampling2D, MaxPooling2D, Concatenate)
- from keras.layers.advanced_activations import LeakyReLU
- from keras.layers import BatchNormalization
- from keras.models import Model
- from keras.regularizers import l2
- from keras.utils.vis_utils import plot_model as plot
- parser = argparse.ArgumentParser(description='Darknet To Keras Converter.')
- parser.add_argument('config_path', help='Path to Darknet cfg file.')
- parser.add_argument('weights_path', help='Path to Darknet weights file.')
- parser.add_argument('output_path', help='Path to output Keras model file.')
- parser.add_argument(
- '-p',
- '--plot_model',
- help='Plot generated Keras model and save as image.',
- action='store_true')
- parser.add_argument(
- '-w',
- '--weights_only',
- help='Save as Keras weights file instead of model file.',
- action='store_true')
- def unique_config_sections(config_file):
- """Convert all config sections to have unique names.
- Adds unique suffixes to config sections for compability with configparser.
- """
- section_counters = defaultdict(int)
- output_stream = io.StringIO()
- with open(config_file) as fin:
- for line in fin:
- if line.startswith('['):
- section = line.strip().strip('[]')
- _section = section + '_' + str(section_counters[section])
- section_counters[section] += 1
- line = line.replace(section, _section)
- output_stream.write(line)
- output_stream.seek(0)
- return output_stream
- # %%
- def _main(args):
- config_path = os.path.expanduser(args.config_path)
- weights_path = os.path.expanduser(args.weights_path)
- assert config_path.endswith('.cfg'), '{} is not a .cfg file'.format(
- config_path)
- assert weights_path.endswith(
- '.weights'), '{} is not a .weights file'.format(weights_path)
- output_path = os.path.expanduser(args.output_path)
- assert output_path.endswith(
- '.h5'), 'output path {} is not a .h5 file'.format(output_path)
- output_root = os.path.splitext(output_path)[0]
- # Load weights and config.
- print('Loading weights.')
- weights_file = open(weights_path, 'rb')
- major, minor, revision = np.ndarray(
- shape=(3, ), dtype='int32', buffer=weights_file.read(12))
- if (major*10+minor)>=2 and major<1000 and minor<1000:
- seen = np.ndarray(shape=(1,), dtype='int64', buffer=weights_file.read(8))
- else:
- seen = np.ndarray(shape=(1,), dtype='int32', buffer=weights_file.read(4))
- print('Weights Header: ', major, minor, revision, seen)
- print('Parsing Darknet config.')
- unique_config_file = unique_config_sections(config_path)
- cfg_parser = configparser.ConfigParser()
- cfg_parser.read_file(unique_config_file)
- print('Creating Keras model.')
- input_layer = Input(shape=(None, None, 3))
- prev_layer = input_layer
- all_layers = []
- weight_decay = float(cfg_parser['net_0']['decay']
- ) if 'net_0' in cfg_parser.sections() else 5e-4
- count = 0
- out_index = []
- for section in cfg_parser.sections():
- print('Parsing section {}'.format(section))
- if section.startswith('convolutional'):
- filters = int(cfg_parser[section]['filters'])
- size = int(cfg_parser[section]['size'])
- stride = int(cfg_parser[section]['stride'])
- pad = int(cfg_parser[section]['pad'])
- activation = cfg_parser[section]['activation']
- batch_normalize = 'batch_normalize' in cfg_parser[section]
- padding = 'same' if pad == 1 and stride == 1 else 'valid'
- # Setting weights.
- # Darknet serializes convolutional weights as:
- # [bias/beta, [gamma, mean, variance], conv_weights]
- prev_layer_shape = K.int_shape(prev_layer)
- weights_shape = (size, size, prev_layer_shape[-1], filters)
- darknet_w_shape = (filters, weights_shape[2], size, size)
- weights_size = np.product(weights_shape)
- print('conv2d', 'bn'
- if batch_normalize else ' ', activation, weights_shape)
- conv_bias = np.ndarray(
- shape=(filters, ),
- dtype='float32',
- buffer=weights_file.read(filters * 4))
- count += filters
- if batch_normalize:
- bn_weights = np.ndarray(
- shape=(3, filters),
- dtype='float32',
- buffer=weights_file.read(filters * 12))
- count += 3 * filters
- bn_weight_list = [
- bn_weights[0], # scale gamma
- conv_bias, # shift beta
- bn_weights[1], # running mean
- bn_weights[2] # running var
- ]
- conv_weights = np.ndarray(
- shape=darknet_w_shape,
- dtype='float32',
- buffer=weights_file.read(weights_size * 4))
- count += weights_size
- # DarkNet conv_weights are serialized Caffe-style:
- # (out_dim, in_dim, height, width)
- # We would like to set these to Tensorflow order:
- # (height, width, in_dim, out_dim)
- conv_weights = np.transpose(conv_weights, [2, 3, 1, 0])
- conv_weights = [conv_weights] if batch_normalize else [
- conv_weights, conv_bias
- ]
- # Handle activation.
- act_fn = None
- if activation == 'leaky':
- pass # Add advanced activation later.
- elif activation != 'linear':
- raise ValueError(
- 'Unknown activation function `{}` in section {}'.format(
- activation, section))
- # Create Conv2D layer
- if stride>1:
- # Darknet uses left and top padding instead of 'same' mode
- prev_layer = ZeroPadding2D(((1,0),(1,0)))(prev_layer)
- conv_layer = (Conv2D(
- filters, (size, size),
- strides=(stride, stride),
- kernel_regularizer=l2(weight_decay),
- use_bias=not batch_normalize,
- weights=conv_weights,
- activation=act_fn,
- padding=padding))(prev_layer)
- if batch_normalize:
- conv_layer = (BatchNormalization(
- weights=bn_weight_list))(conv_layer)
- prev_layer = conv_layer
- if activation == 'linear':
- all_layers.append(prev_layer)
- elif activation == 'leaky':
- act_layer = LeakyReLU(alpha=0.1)(prev_layer)
- prev_layer = act_layer
- all_layers.append(act_layer)
- elif section.startswith('route'):
- ids = [int(i) for i in cfg_parser[section]['layers'].split(',')]
- layers = [all_layers[i] for i in ids]
- if len(layers) > 1:
- print('Concatenating route layers:', layers)
- concatenate_layer = Concatenate()(layers)
- all_layers.append(concatenate_layer)
- prev_layer = concatenate_layer
- else:
- skip_layer = layers[0] # only one layer to route
- all_layers.append(skip_layer)
- prev_layer = skip_layer
- elif section.startswith('maxpool'):
- size = int(cfg_parser[section]['size'])
- stride = int(cfg_parser[section]['stride'])
- all_layers.append(
- MaxPooling2D(
- pool_size=(size, size),
- strides=(stride, stride),
- padding='same')(prev_layer))
- prev_layer = all_layers[-1]
- elif section.startswith('shortcut'):
- index = int(cfg_parser[section]['from'])
- activation = cfg_parser[section]['activation']
- assert activation == 'linear', 'Only linear activation supported.'
- all_layers.append(Add()([all_layers[index], prev_layer]))
- prev_layer = all_layers[-1]
- elif section.startswith('upsample'):
- stride = int(cfg_parser[section]['stride'])
- assert stride == 2, 'Only stride=2 supported.'
- all_layers.append(UpSampling2D(stride)(prev_layer))
- prev_layer = all_layers[-1]
- elif section.startswith('yolo'):
- out_index.append(len(all_layers)-1)
- all_layers.append(None)
- prev_layer = all_layers[-1]
- elif section.startswith('net'):
- pass
- else:
- raise ValueError(
- 'Unsupported section header type: {}'.format(section))
- # Create and save model.
- if len(out_index)==0: out_index.append(len(all_layers)-1)
- model = Model(inputs=input_layer, outputs=[all_layers[i] for i in out_index])
- print(model.summary())
- if args.weights_only:
- model.save_weights('{}'.format(output_path))
- print('Saved Keras weights to {}'.format(output_path))
- else:
- model.save('{}'.format(output_path))
- print('Saved Keras model to {}'.format(output_path))
- # Check to see if all weights have been read.
- remaining_weights = len(weights_file.read()) / 4
- weights_file.close()
- print('Read {} of {} from Darknet weights.'.format(count, count +
- remaining_weights))
- if remaining_weights > 0:
- print('Warning: {} unused weights'.format(remaining_weights))
- if args.plot_model:
- plot(model, to_file='{}.png'.format(output_root), show_shapes=True)
- print('Saved model plot to {}.png'.format(output_root))
- if __name__ == '__main__':
- _main(parser.parse_args())
|