TopicManager.hh
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2012 Open Source Robotics Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16 */
17 #ifndef GAZEBO_TRANSPORT_TOPICMANAGER_HH_
18 #define GAZEBO_TRANSPORT_TOPICMANAGER_HH_
19 
20 // This fixes compiler warnings, see #3147 and #3160
21 #ifndef BOOST_BIND_GLOBAL_PLACEHOLDERS
22 #define BOOST_BIND_GLOBAL_PLACEHOLDERS
23 #endif
24 #include <boost/bind.hpp>
25 #include <boost/function.hpp>
26 #include <map>
27 #include <list>
28 #include <string>
29 #include <vector>
30 #include <boost/unordered/unordered_set.hpp>
31 
32 #include "gazebo/common/Assert.hh"
34 #include "gazebo/msgs/msgs.hh"
36 
45 #include "gazebo/util/system.hh"
46 
48 GZ_SINGLETON_DECLARE(GZ_TRANSPORT_VISIBLE, gazebo, transport, TopicManager)
49 
50 namespace gazebo
51 {
52  namespace transport
53  {
56 
59  class GZ_TRANSPORT_VISIBLE TopicManager : public SingletonT<TopicManager>
60  {
61  private: TopicManager();
62  private: virtual ~TopicManager();
63 
65  public: void Init();
66 
68  public: void Fini();
69 
73  public: PublicationPtr FindPublication(const std::string &_topic);
74 
77  public: void AddNode(NodePtr _node);
78 
81  public: void RemoveNode(unsigned int _id);
82 
86  public: void ProcessNodes(bool _onlyOut = false);
87 
91  public: SubscriberPtr Subscribe(const SubscribeOptions &_options);
92 
97  public: void Unsubscribe(const std::string &_topic, const NodePtr &_sub);
98 
106  public: PublisherPtr Advertise(const std::string &_topic,
107  const std::string &_msgTypeName,
108  unsigned int _queueLimit,
109  double _hzRate)
110  {
111  this->UpdatePublications(_topic, _msgTypeName);
112 
113  PublisherPtr pub = PublisherPtr(new Publisher(_topic,
114  _msgTypeName, _queueLimit, _hzRate));
115 
116  // Connect all local subscription to the publisher
117  PublicationPtr publication = this->FindPublication(_topic);
118  GZ_ASSERT(publication != nullptr,
119  "FindPublication returned nullptr");
120 
121  publication->AddPublisher(pub);
122  if (!publication->GetLocallyAdvertised())
123  {
124  ConnectionManager::Instance()->Advertise(_topic,
125  _msgTypeName);
126  }
127 
128  publication->SetLocallyAdvertised(true);
129  pub->SetPublication(publication);
130 
131  for (auto &iter2 : this->subscribedNodes)
132  {
133  if (iter2.first == _topic)
134  {
135  for (const auto &liter : iter2.second)
136  {
137  publication->AddSubscription(liter);
138  }
139  }
140  }
141 
142  return pub;
143  }
144 
152  public: template<typename M>
153  PublisherPtr Advertise(const std::string &_topic,
154  unsigned int _queueLimit,
155  double _hzRate)
156  {
157  google::protobuf::Message *msg = nullptr;
158  M msgtype;
159  msg = dynamic_cast<google::protobuf::Message *>(&msgtype);
160  if (!msg)
161  gzthrow("Advertise requires a google protobuf type");
162 
163  return this->Advertise(_topic, msg->GetTypeName(), _queueLimit,
164  _hzRate);
165  }
166 
169  public: void Unadvertise(const std::string &_topic);
170 
173  public: void Unadvertise(PublisherPtr _pub);
174 
179  public: void Unadvertise(const std::string &_topic, const uint32_t _id);
180 
187  public: void Publish(const std::string &_topic, MessagePtr _message,
188  boost::function<void(uint32_t)> _cb, uint32_t _id);
189 
193  public: void ConnectPubToSub(const std::string &_topic,
194  const SubscriptionTransportPtr _sublink);
195 
198  public: void ConnectSubToPub(const msgs::Publish &_pub);
199 
204  public: void DisconnectPubFromSub(const std::string &_topic,
205  const std::string &_host,
206  unsigned int _port);
207 
212  public: void DisconnectSubFromPub(const std::string &_topic,
213  const std::string &_host,
214  unsigned int _port);
215 
218  public: void ConnectSubscribers(const std::string &_topic);
219 
225  public: PublicationPtr UpdatePublications(const std::string &_topic,
226  const std::string &_msgType);
227 
230  public: void RegisterTopicNamespace(const std::string &_name);
231 
234  public: void GetTopicNamespaces(std::list<std::string> &_namespaces);
235 
237  public: void ClearBuffers();
238 
241  public: void PauseIncoming(bool _pause);
242 
245  public: void AddNodeToProcess(NodePtr _ptr);
246 
248  typedef std::map<std::string, std::list<NodePtr> > SubNodeMap;
249 
250  private: typedef std::map<std::string, PublicationPtr> PublicationPtr_M;
251  private: PublicationPtr_M advertisedTopics;
252  private: PublicationPtr_M::iterator advertisedTopicsEnd;
253  private: SubNodeMap subscribedNodes;
254  private: std::vector<NodePtr> nodes;
255 
257  private: boost::unordered_set<NodePtr> nodesToProcess;
258 
259  private: boost::recursive_mutex nodeMutex;
260 
262  private: boost::mutex subscriberMutex;
263 
265  private: boost::mutex processNodesMutex;
266 
267  private: bool pauseIncoming;
268 
269  // Singleton implementation
270  private: friend class SingletonT<TopicManager>;
271  };
273  }
274 }
275 #endif
#define GZ_ASSERT(_expr, _msg)
This macro define the standard way of launching an exception inside gazebo.
Definition: Assert.hh:24
transport
Definition: TopicManager.hh:48
Forward declarations for transport.
Singleton template class.
Definition: SingletonT.hh:34
static ConnectionManager * Instance()
Get an instance of the singleton.
Definition: SingletonT.hh:36
A publisher of messages on a topic.
Definition: Publisher.hh:46
Options for a subscription.
Definition: SubscribeOptions.hh:36
Manages topics and their subscriptions.
Definition: TopicManager.hh:60
void Fini()
Finalize the manager.
void ClearBuffers()
Clear all buffers.
void Unadvertise(PublisherPtr _pub)
Unadvertise a publisher.
void PauseIncoming(bool _pause)
Pause or unpause processing of incoming messages.
std::map< std::string, std::list< NodePtr > > SubNodeMap
A map of string->list of Node pointers.
Definition: TopicManager.hh:248
void Init()
Initialize the manager.
SubscriberPtr Subscribe(const SubscribeOptions &_options)
Subscribe to a topic.
void RemoveNode(unsigned int _id)
Remove a node by its id.
void Publish(const std::string &_topic, MessagePtr _message, boost::function< void(uint32_t)> _cb, uint32_t _id)
Send a message.
void Unadvertise(const std::string &_topic, const uint32_t _id)
Unadvertise a publisher, based on a publisher id.
void AddNode(NodePtr _node)
Add a node to the manager.
void RegisterTopicNamespace(const std::string &_name)
Register a new topic namespace.
PublicationPtr UpdatePublications(const std::string &_topic, const std::string &_msgType)
Update our list of advertised topics.
void GetTopicNamespaces(std::list< std::string > &_namespaces)
Get all the topic namespaces.
void ProcessNodes(bool _onlyOut=false)
Process all nodes under management.
void ConnectSubToPub(const msgs::Publish &_pub)
Connect a local Subscriber to a remote Publisher.
void AddNodeToProcess(NodePtr _ptr)
Add a node to the list of nodes that requires processing.
void DisconnectSubFromPub(const std::string &_topic, const std::string &_host, unsigned int _port)
Disconnect all local subscribers from a remote publisher.
void ConnectSubscribers(const std::string &_topic)
Connect all subscribers on a topic to known publishers.
void Unadvertise(const std::string &_topic)
Unadvertise a topic.
PublicationPtr FindPublication(const std::string &_topic)
Find a publication object by topic.
void ConnectPubToSub(const std::string &_topic, const SubscriptionTransportPtr _sublink)
Connection a local Publisher to a remote Subscriber.
void Unsubscribe(const std::string &_topic, const NodePtr &_sub)
Unsubscribe from a topic.
void DisconnectPubFromSub(const std::string &_topic, const std::string &_host, unsigned int _port)
Disconnect a local publisher from a remote subscriber.
PublisherPtr Advertise(const std::string &_topic, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition: TopicManager.hh:153
PublisherPtr Advertise(const std::string &_topic, const std::string &_msgTypeName, unsigned int _queueLimit, double _hzRate)
Advertise on a topic.
Definition: TopicManager.hh:106
#define GZ_SINGLETON_DECLARE(visibility, n1, n2, singletonType)
Helper to declare typed SingletonT.
Definition: SingletonT.hh:61
#define gzthrow(msg)
This macro logs an error to the throw stream and throws an exception that contains the file name and ...
Definition: Exception.hh:39
boost::shared_ptr< Publication > PublicationPtr
Definition: TransportTypes.hh:61
boost::shared_ptr< Subscriber > SubscriberPtr
Definition: TransportTypes.hh:53
boost::shared_ptr< google::protobuf::Message > MessagePtr
Definition: TransportTypes.hh:45
boost::shared_ptr< SubscriptionTransport > SubscriptionTransportPtr
Definition: TransportTypes.hh:69
boost::shared_ptr< Publisher > PublisherPtr
Definition: TransportTypes.hh:49
boost::shared_ptr< Node > NodePtr
Definition: TransportTypes.hh:57
Forward declarations for the common classes.
Definition: Animation.hh:27