diff --git a/python/ray/rllib/optimizers/async_replay_optimizer.py b/python/ray/rllib/optimizers/async_replay_optimizer.py index bacec3675..70779f9e9 100644 --- a/python/ray/rllib/optimizers/async_replay_optimizer.py +++ b/python/ray/rllib/optimizers/async_replay_optimizer.py @@ -203,11 +203,12 @@ class AsyncReplayOptimizer(PolicyOptimizer): self.timers = { k: TimerStat() for k in [ - "put_weights", "get_samples", "enqueue", "sample_processing", + "put_weights", "get_samples", "sample_processing", "replay_processing", "update_priorities", "train", "sample" ] } self.num_weight_syncs = 0 + self.num_samples_dropped = 0 self.learning_started = False # Number of worker steps since the last weight update @@ -283,9 +284,11 @@ class AsyncReplayOptimizer(PolicyOptimizer): with self.timers["replay_processing"]: for ra, replay in self.replay_tasks.completed(): self.replay_tasks.add(ra, ra.replay.remote()) - with self.timers["get_samples"]: - samples = ray.get(replay) - with self.timers["enqueue"]: + if self.learner.inqueue.full(): + self.num_samples_dropped += 1 + else: + with self.timers["get_samples"]: + samples = ray.get(replay) self.learner.inqueue.put((ra, samples)) with self.timers["update_priorities"]: @@ -311,6 +314,7 @@ class AsyncReplayOptimizer(PolicyOptimizer): 3), "train_throughput": round(self.timers["train"].mean_throughput, 3), "num_weight_syncs": self.num_weight_syncs, + "num_samples_dropped": self.num_samples_dropped, "learner_queue": self.learner.learner_queue_size.stats(), "replay_shard_0": replay_stats, }