🤖 Программирование роботов для совместной работы

От теории к практике многоагентных систем

👥 Команда • 🔄 Синхронизация • 💻 Программирование • 🎯 Задачи
7 класс • Технология • 45 минут
mw285748 • 15.06.2025

🎯 Цель урока

💡 Научимся:

  • Программировать синхронизацию действий роботов
  • Создавать протоколы коммуникации между роботами
  • Распределять задачи между агентами системы
  • Отлаживать распределенные робототехнические системы

🤖 Результат: Функционирующая команда роботов для совместного решения задач!

🌟 От одиночки к команде

🔍 Сравнение подходов

Один робот VS Команда роботов:

📊 Производительность:

\[P_{team} = \sum_{i=1}^{n} P_i \times k_{coordination} \times k_{specialization}\]

где:

  • P_i = производительность i-го робота
  • k_coordination = коэффициент координации (0.7-1.2)
  • k_specialization = коэффициент специализации (1.0-2.0)

⚡ Время выполнения:

\[T_{team} = \frac{T_{single}}{n_{effective}} + T_{coordination}\]

🎬 Примеры совместной работы

🏗️ Строительные роботы:

  • 🏗️ Один робот кладет кирпичи
  • 🚚 Другой доставляет материалы
  • 📐 Третий контролирует качество
  • ⏱️ Результат: здание за 1/3 времени

📦 Складские системы:

  • 🤖 100+ роботов работают одновременно
  • 📍 Каждый знает свою зону ответственности
  • 🚫 Избегают столкновений автоматически
  • 📈 Эффективность на 300% выше

🔬 Исследование Марса:

  • 🚁 Вертолет Ingenuity разведывает маршруты
  • 🚗 Perseverance движется по безопасному пути
  • 📡 Орбитальные спутники передают данные на Землю

🏗️ Архитектуры взаимодействия

👑 Централизованная модель

Структура:

 1class CentralizedSystem:
 2    def __init__(self):
 3        self.coordinator = MainController()
 4        self.robots = [Robot(i) for i in range(3)]
 5        
 6    def execute_task(self, task):
 7        # Центральный планировщик
 8        subtasks = self.coordinator.split_task(task)
 9        
10        # Распределение подзадач
11        for i, subtask in enumerate(subtasks):
12            self.robots[i].assign_task(subtask)
13            
14        # Мониторинг выполнения
15        while not self.all_completed():
16            status = self.coordinator.monitor_progress()
17            if status.need_rebalancing:
18                self.coordinator.rebalance_tasks()

Применение: Складские системы, производственные линии

🌐 Децентрализованная модель

Принцип роевого интеллекта:

 1class SwarmRobot:
 2    def __init__(self, robot_id):
 3        self.id = robot_id
 4        self.neighbors = []
 5        self.local_goal = None
 6        
 7    def swarm_behavior(self):
 8        while True:
 9            # Обмен информацией с соседями
10            neighbor_info = self.communicate_with_neighbors()
11            
12            # Локальное принятие решений
13            local_decision = self.make_local_decision(neighbor_info)
14            
15            # Выполнение действия
16            self.execute_action(local_decision)
17            
18            # Адаптация поведения
19            self.adapt_behavior()
20            
21    def make_local_decision(self, neighbor_info):
22        # Простые правила роевого поведения
23        if self.too_crowded(neighbor_info):
24            return "spread_out"
25        elif self.isolated():
26            return "join_group"
27        else:
28            return "continue_task"

Применение: Дроны-исследователи, поисковые операции

🔄 Гибридная модель

Иерархическая структура:

 1class HybridSystem:
 2    def __init__(self):
 3        self.global_coordinator = GlobalPlanner()
 4        self.local_groups = [LocalGroup(i) for i in range(3)]
 5        
 6    def execute_mission(self, mission):
 7        # Глобальное планирование
 8        global_plan = self.global_coordinator.create_plan(mission)
 9        
10        # Распределение между группами
11        for group, task in zip(self.local_groups, global_plan.tasks):
12            group.assign_group_task(task)
13            
14        # Локальная координация в группах
15        for group in self.local_groups:
16            group.coordinate_locally()
17
18class LocalGroup:
19    def coordinate_locally(self):
20        # Децентрализованная координация внутри группы
21        for robot in self.robots:
22            robot.negotiate_with_neighbors()
23            robot.execute_local_plan()

📡 Протоколы коммуникации

💬 Типы сообщений

Статусные сообщения:

 1class StatusMessage:
 2    def __init__(self, sender_id):
 3        self.sender_id = sender_id
 4        self.timestamp = time.now()
 5        self.position = get_current_position()
 6        self.battery_level = read_battery()
 7        self.current_task = get_active_task()
 8        self.status = "active"  # active, busy, error, idle
 9        
10    def to_json(self):
11        return {
12            "type": "status",
13            "sender": self.sender_id,
14            "data": {
15                "position": self.position,
16                "battery": self.battery_level,
17                "task": self.current_task,
18                "status": self.status
19            }
20        }

Запросы координации:

 1class CoordinationRequest:
 2    def __init__(self, sender_id, request_type):
 3        self.sender_id = sender_id
 4        self.request_type = request_type  # help, permission, negotiation
 5        
 6    def create_help_request(self, task_description, required_robots=1):
 7        return {
 8            "type": "help_request",
 9            "sender": self.sender_id,
10            "task": task_description,
11            "robots_needed": required_robots,
12            "priority": self.calculate_priority(),
13            "deadline": self.estimate_deadline()
14        }

🔄 Алгоритмы синхронизации

Барьерная синхронизация:

 1class BarrierSync:
 2    def __init__(self, robot_count):
 3        self.total_robots = robot_count
 4        self.arrived_robots = set()
 5        
 6    def wait_for_all(self, robot_id):
 7        self.arrived_robots.add(robot_id)
 8        
 9        # Оповещение о прибытии
10        self.broadcast(f"Robot {robot_id} reached checkpoint")
11        
12        # Ожидание остальных
13        while len(self.arrived_robots) < self.total_robots:
14            time.sleep(0.1)
15            
16        # Все роботы готовы - продолжаем
17        self.broadcast("All robots synchronized - continue")
18        return True
19
20# Использование в роботе
21def synchronized_movement():
22    move_to_checkpoint()
23    barrier.wait_for_all(my_robot_id)  # Ждем всех
24    continue_mission()  # Продолжаем вместе

Консенсус-алгоритм:

 1class ConsensusProtocol:
 2    def __init__(self, robot_id, neighbors):
 3        self.robot_id = robot_id
 4        self.neighbors = neighbors
 5        self.proposals = {}
 6        
 7    def propose_action(self, action):
 8        # Предлагаем действие соседям
 9        proposal = {
10            "action": action,
11            "sender": self.robot_id,
12            "votes": 0
13        }
14        
15        # Рассылка предложения
16        for neighbor in self.neighbors:
17            neighbor.send("proposal", proposal)
18            
19        # Сбор голосов
20        votes = self.collect_votes(proposal["id"])
21        
22        # Принятие решения по большинству
23        if votes > len(self.neighbors) / 2:
24            return self.execute_action(action)
25        else:
26            return self.wait_for_alternative()

🛡️ Обработка ошибок связи

Обнаружение потери связи:

 1class CommunicationMonitor:
 2    def __init__(self):
 3        self.heartbeat_interval = 1.0  # секунда
 4        self.timeout_threshold = 3.0   # 3 секунды
 5        self.last_seen = {}
 6        
 7    def monitor_robots(self, robot_list):
 8        while True:
 9            current_time = time.now()
10            
11            for robot_id in robot_list:
12                last_contact = self.last_seen.get(robot_id, 0)
13                
14                if current_time - last_contact > self.timeout_threshold:
15                    self.handle_lost_robot(robot_id)
16                    
17            time.sleep(0.5)
18            
19    def handle_lost_robot(self, robot_id):
20        print(f"Lost contact with robot {robot_id}")
21        
22        # Перераспределение задач
23        lost_tasks = self.get_robot_tasks(robot_id)
24        self.redistribute_tasks(lost_tasks)
25        
26        # Уведомление команды
27        self.broadcast(f"Robot {robot_id} offline - tasks redistributed")

🛠️ Практическая работа

📋 Проектные задачи

🚚 Задача 1: “Транспортная команда”

  • 2 робота несут длинный груз
  • Синхронизация скорости и направления
  • Координированные повороты
  • Остановка при препятствиях

🗺️ Задача 2: “Исследователи территории”

  • 3 робота исследуют разные зоны
  • Обмен картами местности
  • Избежание дублирования работы
  • Сбор в точке рандеву

🔧 Задача 3: “Сборочная линия”

  • Конвейер из 3 роботов
  • Каждый выполняет свой этап сборки
  • Передача изделий между роботами
  • Контроль качества на каждом этапе

🏗️ Задача 4: “Строительная бригада”

  • Роботы строят башню из блоков
  • Один подает материалы
  • Другой устанавливает блоки
  • Третий контролирует устойчивость

💻 Базовый код для старта

Класс робота с коммуникацией:

 1class CollaborativeRobot:
 2    def __init__(self, robot_id, communication_module):
 3        self.id = robot_id
 4        self.comm = communication_module
 5        self.position = Position(0, 0)
 6        self.task_queue = []
 7        self.partners = []
 8        
 9    def send_status(self):
10        status = {
11            "id": self.id,
12            "position": self.position,
13            "battery": self.get_battery_level(),
14            "available": len(self.task_queue) == 0
15        }
16        self.comm.broadcast("status", status)
17        
18    def receive_message(self, message):
19        if message["type"] == "task_assignment":
20            self.assign_task(message["task"])
21        elif message["type"] == "coordination_request":
22            self.handle_coordination_request(message)
23        elif message["type"] == "emergency":
24            self.handle_emergency(message)
25            
26    def collaborate_with(self, partner_id, task):
27        # Установление партнерства
28        self.partners.append(partner_id)
29        
30        # Планирование совместной работы
31        joint_plan = self.plan_collaboration(task)
32        
33        # Отправка плана партнеру
34        self.comm.send_to(partner_id, "joint_plan", joint_plan)
35        
36        # Выполнение своей части
37        self.execute_plan_part(joint_plan.my_part)

⏱️ План работы (60 минут)

 1Этап 1: Выбор задачи и планирование (10 мин)
 2• Выбор проектной задачи
 3• Определение ролей роботов
 4• Планирование алгоритма взаимодействия
 5
 6Этап 2: Разработка протоколов (15 мин)
 7• Создание схемы коммуникации
 8• Определение типов сообщений
 9• Проектирование алгоритмов синхронизации
10
11Этап 3: Программирование (25 мин)
12• Написание кода для каждого робота
13• Реализация протоколов связи
14• Программирование логики координации
15
16Этап 4: Тестирование и отладка (15 мин)
17• Проверка взаимодействия роботов
18• Отладка синхронизации
19• Оптимизация производительности
20
21Этап 5: Демонстрация (10 мин)
22• Презентация решения
23• Демонстрация работы команды роботов

🎯 Алгоритмы для задач

🚚 Транспортная команда

 1class TransportTeam:
 2    def __init__(self):
 3        self.leader = Robot("transport_leader")
 4        self.follower = Robot("transport_follower")
 5        self.cargo_length = 50  # см
 6        
 7    def synchronized_movement(self, target):
 8        # Лидер планирует маршрут
 9        path = self.leader.plan_path_to(target)
10        
11        # Отправка плана ведомому
12        self.leader.send_to_follower("path_plan", path)
13        
14        # Синхронное движение
15        for waypoint in path:
16            # Лидер движется к точке
17            self.leader.move_to(waypoint)
18            
19            # Ведомый следует с учетом длины груза
20            follower_target = self.calculate_follower_position(waypoint)
21            self.follower.move_to(follower_target)
22            
23            # Ожидание завершения движения
24            self.wait_for_synchronization()
25            
26    def coordinated_turn(self, angle):
27        # Специальный алгоритм для поворота с грузом
28        turn_center = self.calculate_turn_center()
29        
30        # Лидер поворачивает по внешнему радиусу
31        leader_radius = self.cargo_length / 2 + self.leader.width / 2
32        
33        # Ведомый - по внутреннему радиусу
34        follower_radius = self.cargo_length / 2 - self.follower.width / 2
35        
36        # Синхронное выполнение поворота
37        self.execute_synchronized_turn(leader_radius, follower_radius, angle)

🗺️ Исследователи территории

 1class ExplorationTeam:
 2    def __init__(self, territory_bounds):
 3        self.robots = [Robot(f"explorer_{i}") for i in range(3)]
 4        self.territory = territory_bounds
 5        self.shared_map = SharedMap()
 6        
 7    def divide_territory(self):
 8        # Разделение территории на зоны
 9        zones = self.territory.split_into_zones(len(self.robots))
10        
11        # Назначение зон роботам
12        for robot, zone in zip(self.robots, zones):
13            robot.assign_exploration_zone(zone)
14            
15    def explore_collaboratively(self):
16        while not self.territory_fully_explored():
17            for robot in self.robots:
18                # Исследование своей зоны
19                new_discoveries = robot.explore_step()
20                
21                # Обновление общей карты
22                self.shared_map.update(new_discoveries)
23                
24                # Обмен информацией с командой
25                robot.share_discoveries(new_discoveries)
26                
27                # Проверка на необходимость помощи
28                if robot.needs_help():
29                    self.request_assistance(robot.id)
30                    
31    def request_assistance(self, requesting_robot_id):
32        # Поиск свободного робота для помощи
33        available_robots = [r for r in self.robots 
34                          if r.id != requesting_robot_id and r.is_available()]
35        
36        if available_robots:
37            helper = min(available_robots, 
38                        key=lambda r: r.distance_to(requesting_robot_id))
39            helper.assist_robot(requesting_robot_id)

🔧 Сборочная линия

 1class AssemblyLine:
 2    def __init__(self):
 3        self.stations = [
 4            AssemblyStation("preparation", 0),
 5            AssemblyStation("assembly", 1), 
 6            AssemblyStation("quality_check", 2)
 7        ]
 8        self.conveyor = Conveyor()
 9        
10    def process_item(self, item):
11        current_station = 0
12        
13        while current_station < len(self.stations):
14            station = self.stations[current_station]
15            
16            # Обработка на текущей станции
17            processed_item = station.process(item)
18            
19            # Уведомление следующей станции
20            if current_station < len(self.stations) - 1:
21                next_station = self.stations[current_station + 1]
22                next_station.prepare_for_item(processed_item)
23                
24            # Передача на следующую станцию
25            if current_station < len(self.stations) - 1:
26                self.conveyor.transfer_item(processed_item, current_station + 1)
27                
28            current_station += 1
29            
30class AssemblyStation:
31    def process(self, item):
32        # Ожидание готовности станции
33        while not self.is_ready():
34            time.sleep(0.1)
35            
36        # Выполнение операции
37        result = self.perform_operation(item)
38        
39        # Сигнал о завершении
40        self.signal_completion()
41        
42        return result

🎤 Демонстрация и защита

📊 Презентация решений

План выступления (4 минуты на команду):

  1. Задача - что должна решать команда роботов?
  2. Архитектура - как организовано взаимодействие?
  3. Протоколы - как роботы общаются друг с другом?
  4. Демонстрация - показ работы в действии
  5. Анализ - что получилось, что можно улучшить?

❓ Вопросы для обсуждения:

  • Как система адаптируется к отказу одного робота?
  • Можно ли масштабировать решение на больше роботов?
  • Какие узкие места есть в алгоритме?
  • Как оптимизировать время выполнения задачи?

🏆 Критерии оценки

📊 Матрица оценивания (20 баллов):

Критерий Максимум Описание
Алгоритмы 5 Эффективность, оптимальность, обработка ошибок
Техническая реализация 5 Код, коммуникация, надежность
Выполнение задачи 5 Успешность, время, устойчивость
Презентация 5 Демонстрация, понимание, анализ

🎯 Шкала оценок:

  • 18-20: “5” (отлично)
  • 14-17: “4” (хорошо)
  • 10-13: “3” (удовлетворительно)

🔧 Отладка распределенных систем

🐛 Типичные проблемы

Проблема синхронизации:

 1# ❌ Плохо - роботы не ждут друг друга
 2def bad_synchronization():
 3    robot1.start_task()
 4    robot2.start_task()  # Может начать раньше robot1
 5    
 6# ✅ Хорошо - явная синхронизация
 7def good_synchronization():
 8    barrier = SynchronizationBarrier(2)
 9    
10    robot1.prepare_task()
11    robot2.prepare_task()
12    
13    barrier.wait()  # Ждем готовности всех
14    
15    robot1.start_task()
16    robot2.start_task()

Проблема deadlock (взаимная блокировка):

 1# ❌ Deadlock - роботы ждут друг друга вечно
 2def potential_deadlock():
 3    robot1.request_resource(resource_A)
 4    robot1.request_resource(resource_B)  # Может зависнуть
 5    
 6    robot2.request_resource(resource_B)
 7    robot2.request_resource(resource_A)  # Может зависнуть
 8    
 9# ✅ Избежание deadlock - упорядоченный захват ресурсов
10def avoid_deadlock():
11    # Всегда захватываем ресурсы в одинаковом порядке
12    resources = sorted([resource_A, resource_B])
13    
14    for robot in [robot1, robot2]:
15        for resource in resources:
16            robot.request_resource(resource)

📊 Мониторинг системы

 1class SystemMonitor:
 2    def __init__(self, robots):
 3        self.robots = robots
 4        self.performance_log = []
 5        
 6    def monitor_performance(self):
 7        while True:
 8            metrics = {
 9                "timestamp": time.now(),
10                "active_robots": len([r for r in self.robots if r.is_active()]),
11                "task_completion_rate": self.calculate_completion_rate(),
12                "communication_latency": self.measure_latency(),
13                "system_efficiency": self.calculate_efficiency()
14            }
15            
16            self.performance_log.append(metrics)
17            
18            # Предупреждения о проблемах
19            if metrics["communication_latency"] > 1000:  # мс
20                self.alert("High communication latency detected")
21                
22            if metrics["system_efficiency"] < 0.7:
23                self.alert("System efficiency below threshold")
24                
25            time.sleep(5)  # Мониторинг каждые 5 секунд

🤔 Рефлексия “Светофор+”

📝 Анализ работы

🟢 ЗЕЛЕНЫЙ (что получилось хорошо):

  • 🟡 ЖЕЛТЫЙ (что можно улучшить):

  • 🔴 КРАСНЫЙ (что вызвало затруднения):

  • ➕ ПЛЮС (что взять на заметку):

    Оценка работы команды (1-5): ___

    🏠 Домашнее задание

    🎯 Отчет по практической работе

    📋 Структура отчета:

    1. Название проекта многоагентной системы
    2. Цель и задачи системы
    3. Состав команды роботов и их роли
    4. Алгоритмы координации с блок-схемами
    5. Система коммуникации и протоколы
    6. Результаты тестирования и проблемы
    7. Перспективы развития системы
    8. Выводы и полученный опыт

    🌟 Творческое задание

    Предложить улучшения для системы:

    • Добавить машинное обучение для оптимизации
    • Интегрировать с системами IoT
    • Создать веб-интерфейс для мониторинга
    • Разработать алгоритмы самовосстановления

    🎉 Итоги урока

    🏆 Что освоили

    ✅ Научились:

    • Программировать синхронизацию действий роботов
    • Создавать протоколы межроботной коммуникации
    • Распределять задачи между агентами системы
    • Отлаживать сложные распределенные системы

    🧠 Поняли:

    • Координация роботов требует специальных алгоритмов
    • Коммуникация критически важна для совместной работы
    • Отказоустойчивость обеспечивает надежность системы
    • Мониторинг необходим для поддержания производительности

    🌟 Главный принцип

    “Эффективная команда роботов = Правильные алгоритмы + Надежная связь + Умная координация”

    🚀 Следующий шаг: Создание автономных адаптивных систем с машинным обучением

    💡 Теперь ваши роботы умеют работать в команде как настоящие профессионалы!