算法研究

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import numpy as np
import matplotlib.pyplot as plt

def func(x):

if x.ndim == 1:
x = x.reshape(1, -1)
A = 10
n = x.shape[0]
return A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x), axis=0)


def fast_flood_fill_optimizer_V3(
func,
bounds,
n_droplets_initial=200,
max_iterations=500,
water_level_rise_factor=0.01,
droplet_injection_scale=0.05,
grid_res=50,
verbose=True,
splash_rate=0.05,
splash_count_min=1,

patience=20,
min_delta=1e-6
):
dim = len(bounds)
if dim > 2:
print("Warning: Visualization is only for 2D. Running optimization in %dD." % dim)

min_b = np.array([b[0] for b in bounds])
max_b = np.array([b[1] for b in bounds])
search_space_range = np.linalg.norm(max_b - min_b)

droplets = np.zeros((dim, n_droplets_initial))
for d_idx in range(dim):
low = bounds[d_idx][0]
high = bounds[d_idx][1]
droplets[d_idx, :] = np.random.uniform(low, high, n_droplets_initial)

droplet_values = func(droplets)

grid_min_values = np.full((grid_res, grid_res), np.inf)

best_solution = droplets[:, np.argmin(droplet_values)]
best_value = np.min(droplet_values)

def get_grid_idx(pos):
scaled_pos = (pos - min_b) / (max_b - min_b)
idx = np.floor(scaled_pos * grid_res).astype(int)
idx = np.clip(idx, 0, grid_res - 1)
return idx[0], idx[1]

water_level = best_value
water_level_rise_step = water_level_rise_factor * search_space_range

history = [best_value]

best_value_stagnant_count = 0

print(f"Starting Fast Flood-Fill optimization with QDA-style stopping criteria...")

for iteration in range(max_iterations):


for i in range(droplets.shape[1]):
x, y = get_grid_idx(droplets[:, i])
grid_min_values[x, y] = min(grid_min_values[x, y], droplet_values[i])


water_level += water_level_rise_step


flooded_grid = grid_min_values <= water_level
visited = np.zeros_like(flooded_grid, dtype=bool)
clusters = []

for i in range(grid_res):
for j in range(grid_res):
if flooded_grid[i, j] and not visited[i, j]:
cluster_grid_indices = []
queue = [(i, j)]
visited[i, j] = True

while queue:
x_g, y_g = queue.pop(0)
cluster_grid_indices.append((x_g, y_g))
for dx, dy in [(0, 1), (0, -1), (1, 0), (-1, 0)]:
nx, ny = x_g + dx, y_g + dy
if 0 <= nx < grid_res and 0 <= ny < grid_res and flooded_grid[nx, ny] and not visited[nx, ny]:
visited[nx, ny] = True
queue.append((nx, ny))
clusters.append(cluster_grid_indices)


newly_injected_droplets = []
for cluster_grid in clusters:
best_in_cluster_value = np.inf
best_in_cluster_pos = None

for i in range(droplets.shape[1]):
x, y = get_grid_idx(droplets[:, i])
if (x, y) in cluster_grid and droplet_values[i] < best_in_cluster_value:
best_in_cluster_value = droplet_values[i]
best_in_cluster_pos = droplets[:, i]

if best_in_cluster_pos is not None:
injection_sigma = droplet_injection_scale * search_space_range
injected_droplet = np.random.normal(best_in_cluster_pos.reshape(-1, 1), scale=injection_sigma, size=(dim, 1))
injected_droplet = np.clip(injected_droplet, min_b.reshape(-1, 1), max_b.reshape(-1, 1))
newly_injected_droplets.append(injected_droplet)

splash_count = max(splash_count_min, int(n_droplets_initial * splash_rate))
splashed_droplets = np.zeros((dim, splash_count))
for d_idx in range(dim):
splashed_droplets[d_idx, :] = np.random.uniform(bounds[d_idx][0], bounds[d_idx][1], splash_count)


new_droplets_array = np.hstack(newly_injected_droplets + [splashed_droplets])
droplets = np.hstack((droplets, new_droplets_array))
droplet_values = func(droplets)

old_best_value = best_value
current_best_value = np.min(droplet_values)
if current_best_value < best_value:
best_value = current_best_value
best_solution = droplets[:, np.argmin(droplet_values)]

history.append(best_value)


if abs(best_value - old_best_value) < min_delta:
best_value_stagnant_count += 1
else:
best_value_stagnant_count = 0

if verbose and iteration % 10 == 0:
print(f"Iteration {iteration}: Water Level = {water_level:.4f}, Best Value = {best_value:.4f}, Droplets = {droplets.shape[1]}, Clusters = {len(clusters)}")


if best_value_stagnant_count >= patience:
print(f"Best value has not improved for {patience} iterations, terminating...")
break

print("\nOptimization finished.")
return best_solution, best_value, history


if __name__ == "__main__":
bounds = [(-5.12, 5.12), (-5.12, 5.12)]

final_solution, final_value, opt_history = fast_flood_fill_optimizer_V3(
func,
bounds,
n_droplets_initial=200,
max_iterations=500,
water_level_rise_factor=0.005,
droplet_injection_scale=0.01,
grid_res=50,
splash_rate=0.1,
splash_count_min=5,
# 新增的QDA风格结束条件参数
patience=20,
min_delta=1e-8
)

print(f"\nFinal Best Solution: {final_solution}")
print(f"Final Best Value: {final_value:.4f}")

plt.figure()
plt.plot(opt_history)
plt.title("Fast Flood-Fill Optimization Progress with QDA-style stopping")
plt.xlabel("Iteration")
plt.ylabel("Objective Function Value")
plt.grid(True)
plt.show()

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Starting Fast Flood-Fill optimization with QDA-style stopping criteria...
Iteration 0: Water Level = 3.1083, Best Value = 3.0358, Droplets = 221, Clusters = 1
Iteration 10: Water Level = 3.8323, Best Value = 2.7491, Droplets = 436, Clusters = 2
Iteration 20: Water Level = 4.5564, Best Value = 2.7491, Droplets = 661, Clusters = 3
Iteration 30: Water Level = 5.2805, Best Value = 2.0884, Droplets = 889, Clusters = 3
Iteration 40: Water Level = 6.0046, Best Value = 1.0252, Droplets = 1136, Clusters = 6
Iteration 50: Water Level = 6.7286, Best Value = 1.0071, Droplets = 1432, Clusters = 13
Iteration 60: Water Level = 7.4527, Best Value = 0.2456, Droplets = 1775, Clusters = 15
Iteration 70: Water Level = 8.1768, Best Value = 0.2456, Droplets = 2125, Clusters = 15
Best value has not improved for 20 iterations, terminating...

Optimization finished.

Final Best Solution: [-0.03198783 0.01477733]
Final Best Value: 0.2456

image

算法简述:

首先随机进行全局的“洒水”,找到其中的最低点。为了速率,将其分成很多个网格,我们用网格中的局部最小值代表其中的最低点,然后我们在整个区域中的最小值注水,逐步提高他的能量值,进行迭代,直到看到两个网格联通,我们将会将这两个网格看成一个“湖泊”不再分开进行洒水,将会在这个湖泊的(两个或多个网格的最低点洒水),为了增加随机性,还会将随机洒水(模拟在注水中水滴的飞溅)。其收敛条件参照qda。

算法前身

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D


def objective_function(x):
if x.ndim == 1:
x = x.reshape(1, -1)

A = 10
n = x.shape[1]
return A * n + np.sum(x**2 - A * np.cos(2 * np.pi * x), axis=1)

def analytical_gradient(x):
A = 10
return 2 * x + 2 * np.pi * A * np.sin(2 * np.pi * x)


def water_flow_erosion_optimizer(
func,
grad_func,
bounds,
n_droplets=50,
max_iterations=1000,
learning_rate=0.1,
erosion_rate=0.2,
decay_rate=0.95,
random_perturbation_scale=0.05,
grid_res=50,
verbose=False
):
dim = len(bounds)
if dim != 2 and verbose:
print("Warning: Visualization is only for 2D. Running optimization in %dD." % dim)

droplets = np.zeros((n_droplets, dim))
for d_idx in range(dim):
droplets[:, d_idx] = np.random.uniform(bounds[d_idx][0], bounds[d_idx][1], n_droplets)

min_b = np.array([b[0] for b in bounds])
max_b = np.array([b[1] for b in bounds])

water_level_grid = np.zeros((grid_res, grid_res))

def get_grid_idx(pos):
scaled_pos = (pos - min_b) / (max_b - min_b)
idx = np.floor(scaled_pos * grid_res).astype(int)
idx = np.clip(idx, 0, grid_res - 1)
return tuple(idx)

best_global_solution = droplets[0].copy()
best_global_value = func(best_global_solution).item()

history = []

if verbose:
print(f"Starting WFEO optimization with {n_droplets} droplets...")

for iteration in range(max_iterations):
for i in range(n_droplets):
current_pos = droplets[i].copy()

original_grad = grad_func(current_pos)

grid_idx = get_grid_idx(current_pos)
current_water_level = water_level_grid[grid_idx]

effective_grad = original_grad + current_water_level * np.sign(original_grad)

new_pos = current_pos - learning_rate * effective_grad
new_pos += np.random.normal(0, random_perturbation_scale, dim)

for d in range(dim):
new_pos[d] = np.clip(new_pos[d], bounds[d][0], bounds[d][1])

new_grid_idx = get_grid_idx(new_pos)
water_level_grid[new_grid_idx] += erosion_rate

droplets[i] = new_pos

current_droplet_value = func(new_pos).item()
if current_droplet_value < best_global_value:
best_global_value = current_droplet_value
best_global_solution = new_pos.copy()

water_level_grid *= decay_rate

history.append(best_global_value)
if verbose and iteration % 50 == 0:
print(f"Iteration {iteration}: Best value = {best_global_value:.4f}")

if verbose:
print("\nOptimization finished.")
return best_global_solution, best_global_value, history, water_level_grid

def run_tests(params, num_tests=5):
"""为一组参数运行多次测试,并返回结果。"""
print(f"\n--- Running Tests for Parameters: {params} ---")
bounds = [(-5.12, 5.12), (-5.12, 5.12)]
results = []

for i in range(num_tests):
_, final_value, _, _ = water_flow_erosion_optimizer(
objective_function,
analytical_gradient,
bounds,
**params,
verbose=False
)
results.append(final_value)
print(f"Test {i+1}: Final Best Value = {final_value:.4f}")

avg_value = np.mean(results)
std_value = np.std(results)
print(f"Average Final Value: {avg_value:.4f} ± {std_value:.4f}")
return avg_value, std_value, results



if __name__ == "__main__":
bounds = [(-5.12, 5.12), (-5.12, 5.12)]

params_baseline = {
'n_droplets': 100,
'max_iterations': 1000,
'learning_rate': 0.1,
'erosion_rate': 0.005,
'decay_rate': 0.995,
'random_perturbation_scale': 0.01,
'grid_res': 100
}

params_aggressive = {
'n_droplets': 200, # 更多水滴
'max_iterations': 1000,
'learning_rate': 0.1,
'erosion_rate': 0.01, # 更高的侵蚀率
'decay_rate': 0.99,
'random_perturbation_scale': 0.05, # 更大的扰动
'grid_res': 100
}

params_conservative = {
'n_droplets': 50, # 更少水滴
'max_iterations': 1000,
'learning_rate': 0.05, # 更低的学习率
'erosion_rate': 0.001, # 更低的侵蚀率
'decay_rate': 0.999,
'random_perturbation_scale': 0.005, # 更小的扰动
'grid_res': 100
}

print("--- 批量性能测试开始 ---")
avg_baseline, std_baseline, results_baseline = run_tests(params_baseline)
avg_aggressive, std_aggressive, results_aggressive = run_tests(params_aggressive)
avg_conservative, std_conservative, results_conservative = run_tests(params_conservative)
print("--- 批量性能测试结束 ---")

# 绘制结果箱线图进行比较
plt.figure(figsize=(8, 6))
plt.boxplot([results_baseline, results_aggressive, results_conservative],
labels=['Baseline', 'Aggressive', 'Conservative'])
plt.title('WFEO Performance Comparison Across Different Parameter Sets')
plt.ylabel('Final Best Value')
plt.xlabel('Parameter Strategy')
plt.grid(True)
plt.show()


print("\n--- 绘制一次详细运行结果(使用基准参数)---")
final_solution, final_value, opt_history, final_water_grid = water_flow_erosion_optimizer(
objective_function,
analytical_gradient,
bounds,
**params_baseline,
verbose=True # 详细输出
)

print(f"\nFinal Best Solution: {final_solution}")
print(f"Final Best Value: {final_value:.4f}")

fig = plt.figure(figsize=(15, 6))
ax1 = fig.add_subplot(121)
ax1.plot(opt_history)
ax1.set_title("Optimization Progress (Best Value per Iteration)")
ax1.set_xlabel("Iteration")
ax1.set_ylabel("Objective Function Value")
ax1.grid(True)

ax2 = fig.add_subplot(122, projection='3d')
x = np.linspace(bounds[0][0], bounds[0][1], 100)
y = np.linspace(bounds[1][0], bounds[1][1], 100)
X, Y = np.meshgrid(x, y)
points = np.stack([X.flatten(), Y.flatten()], axis=1)
Z = objective_function(points).reshape(X.shape)

ax2.plot_surface(X, Y, Z, cmap='viridis', alpha=0.7, rstride=1, cstride=1)

water_level_scale_factor = (np.max(Z) - np.min(Z)) / (np.max(final_water_grid) + 1e-9) * 0.1
Z_with_water = Z + final_water_grid * water_level_scale_factor
ax2.plot_surface(X, Y, Z_with_water, cmap='Blues', alpha=0.3, rstride=1, cstride=1)

ax2.scatter(final_solution[0], final_solution[1], final_value, color='red', s=100, label='Final Best', zorder=10)

ax2.set_title("Energy Landscape with Final Water Levels")
ax2.set_xlabel("X-axis")
ax2.set_ylabel("Y-axis")
ax2.set_zlabel("Energy")
ax2.legend()

plt.tight_layout()
plt.show()

算法解释:依赖梯度梯度解析,黑盒状态下很差,每个水滴都沿着负梯度方向移动,通过历史,多次探索过的地方将会积累水位,造成下降“减速”,促进向新方位探索,通过扰动和“蒸发(降低水位),避免过分的滚雪球,增加随机性。

效率奇差无比,速度缓慢收敛效果极差