Medical Imaging Interaction Toolkit  2018.4.99-389bf124
Medical Imaging Interaction Toolkit
mitkIGTLMessageProvider.cpp
Go to the documentation of this file.
1 /*============================================================================
2 
3 The Medical Imaging Interaction Toolkit (MITK)
4 
5 Copyright (c) German Cancer Research Center (DKFZ)
6 All rights reserved.
7 
8 Use of this source code is governed by a 3-clause BSD license that can be
9 found in the LICENSE file.
10 
11 ============================================================================*/
12 
14 
15 #include "mitkIGTLDevice.h"
16 #include "mitkIGTLMessage.h"
17 #include "mitkIGTLMessageFactory.h"
18 
20 
21 //Microservices
22 #include "usServiceReference.h"
23 #include "usModuleContext.h"
24 #include "usServiceEvent.h"
25 #include "mitkServiceInterface.h"
26 #include "usGetModuleContext.h"
27 
28 //igt (remove this later)
29 #include "igtlBindMessage.h"
30 #include "igtlQuaternionTrackingDataMessage.h"
31 #include "igtlTrackingDataMessage.h"
32 
33 #ifndef WIN32
34 #include <unistd.h>
35 #endif
36 
39 {
40  this->SetName("IGTLMessageProvider");
41  //m_MultiThreader = itk::MultiThreader::New();
42  m_StreamingTimeMutex = itk::FastMutexLock::New();
43  //m_StopStreamingThreadMutex = itk::FastMutexLock::New();
44  //m_ThreadId = 0;
45  m_IsStreaming = false;
46 
47  // Create a command object. The function will be called later from the main thread
48  this->m_StopStreamingCommand = ProviderCommand::New();
49  m_StopStreamingCommand->SetCallbackFunction(this,
51 
52  this->m_StreamingCommand = ProviderCommand::New();
53  m_StreamingCommand->SetCallbackFunction(this,
55 }
56 
58 {
60  //this->m_StopStreamingThreadMutex->Lock();
61  //this->m_StopStreamingThread = true;
62  //this->m_StopStreamingThreadMutex->Unlock();
63  //if ( m_ThreadId >= 0)
64  //{
65  // this->m_MultiThreader->TerminateThread(m_ThreadId);
66  //}
67  this->InvokeEvent(StreamingStartRequiredEvent());
68 }
69 
71 {
72 
74 
75  if (this->GetInput() != nullptr)
76  {
77  igtl::MessageBase::Pointer curMessage = this->GetInput()->GetMessage();
78  if (dynamic_cast<igtl::TrackingDataMessage*>(curMessage.GetPointer()) != nullptr)
79  {
80  igtl::TrackingDataMessage* tdMsg =
81  (igtl::TrackingDataMessage*)(curMessage.GetPointer());
82  igtl::TrackingDataElement::Pointer trackingData = igtl::TrackingDataElement::New();
83  tdMsg->GetTrackingDataElement(0, trackingData);
84  float x_pos, y_pos, z_pos;
85  trackingData->GetPosition(&x_pos, &y_pos, &z_pos);
86  }
87  }
88 }
89 
91 {
92  if (this->m_IGTLDevice.IsNull())
93  return;
94 
95  for (unsigned int index = 0; index < this->GetNumberOfIndexedInputs(); index++)
96  {
97  mitk::IGTLMessage::Pointer msg = const_cast<mitk::IGTLMessage*>(this->GetInput(index));
98  if (msg == nullptr)
99  {
100  continue;
101  }
102 
103  if ( !msg->IsDataValid() )
104  {
105  continue;
106  }
107 
108  this->m_IGTLDevice->SendMessage(msg);
109  }
110 }
111 
113 {
114  //if outputs are set then delete them
115  if (this->GetNumberOfOutputs() > 0)
116  {
117  for (int numOP = this->GetNumberOfOutputs() - 1; numOP >= 0; numOP--)
118  this->RemoveOutput(numOP);
119  this->Modified();
120  }
121 
122  //fill the outputs if a valid OpenIGTLink device is set
123  if (m_IGTLDevice.IsNull())
124  return;
125 
126  this->SetNumberOfIndexedOutputs(1);
127  if (this->GetOutput(0) == nullptr)
128  {
129  DataObjectPointer newOutput = this->MakeOutput(0);
130  this->SetNthOutput(0, newOutput);
131  this->Modified();
132  }
133 }
134 
135 //void mitk::IGTLMessageProvider::UpdateOutputInformation()
136 //{
137 // this->Modified(); // make sure that we need to be updated
138 // Superclass::UpdateOutputInformation();
139 //}
140 
141 
143 {
144 
145 }
146 
148 {
149  //in case the provider is streaming at the moment we have to stop it
150  if (m_IsStreaming)
151  {
152  MITK_DEBUG("IGTLMessageProvider") << "lost connection, stop streaming";
154  }
155 }
156 
157 std::string RemoveRequestPrefixes(std::string requestType)
158 {
159  return requestType.substr(4);
160 }
161 
163 {
164  //get the next command
165  igtl::MessageBase::Pointer curCommand = this->m_IGTLDevice->GetNextCommand();
166  //extract the type
167  const char * requestType = curCommand->GetDeviceType();
168  //check the type
169  std::string reqType(requestType);
170  bool isGetMsg = !reqType.find("GET_");
171  bool isSTTMsg = !reqType.find("STT_");
172  bool isSTPMsg = !reqType.find("STP_");
173 
174  //get the type from the request type (remove STT_, STP_, GET_, RTS_)
175  std::string type = RemoveRequestPrefixes(requestType);
176  //check all microservices if there is a fitting source for the requested type
177  mitk::IGTLMessageSource::Pointer source = this->GetFittingSource(type.c_str());
178  //if there is no fitting source return a RTS message, if there is a RTS
179  //type defined in the message factory send it
180  if ( source.IsNull() )
181  {
182  if ( !this->GetIGTLDevice()->SendRTSMessage(type.c_str()) )
183  {
184  //sending RTS message failed, probably because the type is not in the
185  //message factory
186  MITK_WARN("IGTLMessageProvider") << "Tried to send a RTS message but did "
187  "not succeed. Check if this type ( "
188  << type << " ) was added to the message "
189  "factory. ";
190  }
191  }
192  else
193  {
194  if ( isGetMsg ) //if it is a single value push it into sending queue
195  {
196  //first it is necessary to update the source. This needs additional time
197  //but is necessary. But are we really allowed to call this here? In which
198  //thread are we? Is the source thread safe?
199  source->Update();
200  mitk::IGTLMessage::Pointer sourceOutput = source->GetOutput();
201  if (sourceOutput.IsNotNull() && sourceOutput->IsDataValid())
202  {
203  if ( source.IsNotNull() )
204  {
205  this->GetIGTLDevice()->SendMessage(sourceOutput);
206  }
207  }
208  }
209  else if ( isSTTMsg )
210  {
211  //read the requested frames per second
212  int fps = 10;
213 
214  //read the fps from the command
215  igtl::MessageBase* curCommandPt = curCommand.GetPointer();
216  if ( std::strcmp( curCommand->GetDeviceType(), "STT_BIND" ) == 0 )
217  {
218  fps = ((igtl::StartBindMessage*)curCommandPt)->GetResolution();
219  }
220  else if ( std::strcmp( curCommand->GetDeviceType(), "STT_QTDATA" ) == 0 )
221  {
222  fps = ((igtl::StartQuaternionTrackingDataMessage*)curCommandPt)->GetResolution();
223  }
224  else if ( std::strcmp( curCommand->GetDeviceType(), "STT_TDATA" ) == 0 )
225  {
226  fps = ((igtl::StartTrackingDataMessage*)curCommandPt)->GetResolution();
227  }
228 
229  this->StartStreamingOfSource(source, fps);
230  }
231  else if ( isSTPMsg )
232  {
233  this->StopStreamingOfSource(source);
234  }
235  else
236  {
237  //do nothing
238  }
239  }
240 }
241 
243 {
244  return m_IsStreaming;
245 }
246 
248  unsigned int fps)
249 {
250  if ( src == nullptr )
251  return;
252 
253  //so far the provider allows the streaming of a single source only
254  //if the streaming thread is already running return a RTS message
255  if ( !m_IsStreaming )
256  {
257  //if it is a stream establish a connection between the provider and the
258  //source
259  this->ConnectTo(src);
260 
261  // calculate the streaming time
262  this->m_StreamingTimeMutex->Lock();
263  this->m_StreamingTime = 1.0 / (double) fps * 1000.0;
264  this->m_StreamingTimeMutex->Unlock();
265 
271  //this->m_ThreadId = m_MultiThreader->SpawnThread(this->TimerThread, this);
272 
274  this->m_StreamingCommand);
275 
276  this->m_IsStreaming = true;
277  }
278  else
279  {
280  MITK_WARN("IGTLMessageProvider") << "This provider just supports the "
281  "streaming of one source.";
282  }
283 }
284 
286 {
287  this->InvokeEvent(StreamingStartRequiredEvent());
288 }
289 
291 {
292  this->InvokeEvent(StreamingStopRequiredEvent());
293 }
294 
296 {
297  //this is something bad!!! The streaming thread has to be stopped before the
298  //source is disconnected otherwise it can cause a crash. This has to be added!!
299  this->DisconnectFrom(src);
300 
301  //this->m_StopStreamingThreadMutex->Lock();
302  //this->m_StopStreamingThread = true;
303  //this->m_StopStreamingThreadMutex->Unlock();
304 
306  this->m_StopStreamingCommand);
307 
308  //does this flag needs a mutex???
309  this->m_IsStreaming = false;
310 }
311 
313 {
314  // \todo remove all inputs
315 
317  this->m_StopStreamingCommand);
318 
319  //does this flag needs a mutex???
320  this->m_IsStreaming = false;
321 }
322 
323 mitk::IGTLMessageSource::Pointer mitk::IGTLMessageProvider::GetFittingSource(const char* requestedType)
324 {
325  //get the context
326  us::ModuleContext* context = us::GetModuleContext();
327  //define the interface name
328  std::string interface = mitk::IGTLMessageSource::US_INTERFACE_NAME;
329  //specify a filter that defines the requested type
330  std::string filter = "(" + mitk::IGTLMessageSource::US_PROPKEY_DEVICETYPE +
331  "=" + requestedType + ")";
332  //find the fitting service
333  std::vector<us::ServiceReferenceU> serviceReferences =
334  context->GetServiceReferences(interface, filter);
335 
336  //check if a service reference was found. It is also possible that several
337  //services were found. This is not checked here, just the first one is taken.
338  if ( serviceReferences.size() )
339  {
340  mitk::IGTLMessageSource::Pointer curSource =
341  context->GetService<mitk::IGTLMessageSource>(serviceReferences.front());
342 
343  if ( curSource.IsNotNull() )
344  return curSource;
345  }
346  //no service reference was found or found service reference has no valid source
347  return nullptr;
348 }
349 
350 void mitk::IGTLMessageProvider::Send(mitk::IGTLMessage::Pointer msg)
351 {
352  if (msg != nullptr)
353  {
354  MITK_INFO << "Sending OpenIGTLink Message: " << msg->ToString();
355  this->m_IGTLDevice->SendMessage(msg);
356  }
357 }
358 
359 void
361 {
362  for (DataObjectPointerArraySizeType i = 0;
363  i < UpstreamFilter->GetNumberOfOutputs(); i++)
364  {
365  this->SetInput(i, UpstreamFilter->GetOutput(i));
366  }
367 }
368 
369 void
371 {
372  if (UpstreamFilter == nullptr)
373  return;
374 
375  for (DataObjectPointerArraySizeType i = 0; i < UpstreamFilter->GetNumberOfOutputs(); ++i)
376  {
377  auto input = UpstreamFilter->GetOutput(i);
378 
379  if (input == nullptr)
380  continue;
381 
382  auto nb = this->GetNumberOfIndexedInputs();
383 
384  for (DataObjectPointerArraySizeType i = 0; i < nb; ++i)
385  {
386  if (this->GetInput(i) == input)
387  {
388  this->RemoveInput(i);
389  break;
390  }
391  }
392  }
393 }
394 
395 //ITK_THREAD_RETURN_TYPE mitk::IGTLMessageProvider::TimerThread(void* pInfoStruct)
396 //{
397 // // extract this pointer from thread info structure
398 // struct itk::MultiThreader::ThreadInfoStruct * pInfo =
399 // (struct itk::MultiThreader::ThreadInfoStruct*)pInfoStruct;
400 // mitk::IGTLMessageProvider* thisObject =
401 // static_cast<mitk::IGTLMessageProvider*>(pInfo->UserData);
402 //
403 // itk::SimpleMutexLock mutex;
404 // mutex.Lock();
405 //
406 // thisObject->m_StopStreamingThreadMutex->Lock();
407 // thisObject->m_StopStreamingThread = false;
408 // thisObject->m_StopStreamingThreadMutex->Unlock();
409 //
410 // thisObject->m_StreamingTimeMutex->Lock();
411 // unsigned int waitingTime = thisObject->m_StreamingTime;
412 // thisObject->m_StreamingTimeMutex->Unlock();
413 //
414 // while (true)
415 // {
416 // thisObject->m_StopStreamingThreadMutex->Lock();
417 // bool stopThread = thisObject->m_StopStreamingThread;
418 // thisObject->m_StopStreamingThreadMutex->Unlock();
419 //
420 // if (stopThread)
421 // {
422 // break;
423 // }
424 //
425 // //wait for the time given
426 // //I know it is not the nicest solution but we just need an approximate time
427 // //sleeps for 20 ms
428 // #if defined (WIN32) || defined (_WIN32)
429 // Sleep(waitingTime);
430 // #else
431 // usleep(waitingTime * 1000);
432 // #endif
433 //
434 // // Ask to execute that command from the GUI thread
435 // mitk::CallbackFromGUIThread::GetInstance()->CallThisFromGUIThread(
436 // thisObject->m_StreamingCommand);
437 // }
438 //
439 // thisObject->m_ThreadId = 0;
440 //
441 // mutex.Unlock();
442 //
443 // return ITK_THREAD_RETURN_VALUE;
444 //}
void OnLostConnection() override
This method is called when the IGTL device lost the connection to the other side. ...
#define MITK_INFO
Definition: mitkLogMacros.h:18
Connects a mitk::IGTLDevice to a MITK-OpenIGTLink-Message-Filter-Pipeline.
itk::DataObject::Pointer MakeOutput(DataObjectPointerArraySizeType idx) override
const IGTLMessage * GetInput(void) const
Get the input of this filter.
static void Update(vtkPolyData *)
Definition: mitkSurface.cpp:31
#define MITK_DEBUG
Definition: mitkLogMacros.h:22
DataCollection - Class to facilitate loading/accessing structured data.
mitk::IGTLMessageSource::Pointer GetFittingSource(const char *requestedType)
Looks for microservices that provide messages with the requested type.
mitk::IGTLDevice::Pointer m_IGTLDevice
virtual mitk::IGTLDevice * GetIGTLDevice()
returns the OpenIGTLink device that is used by this filter
void StopStreamingOfSource(mitk::IGTLMessageSource *src)
Stops the streaming of the given message source.
void CreateOutputs()
Create the necessary outputs for the m_IGTLDevice.
void Send(mitk::IGTLMessage::Pointer msg)
sends the msg to the requesting client
bool IsStreaming()
Returns the streaming state.
void StopStreamingOfAllSources()
Stops the streaming of all message source.
bool SendRTSMessage(const char *type)
Send RTS message of given type.
void StartStreamingOfSource(mitk::IGTLMessageSource *src, unsigned int fps)
Starts the streaming of the given message source with the given fps.
IGTLMessage * GetOutput(void)
return the output (output with id 0) of the filter
#define MITK_WARN
Definition: mitkLogMacros.h:19
A wrapper for the OpenIGTLink message type.
static const std::string US_PROPKEY_DEVICETYPE
void CallThisFromGUIThread(itk::Command *, itk::EventObject *e=nullptr)
Change the current application cursor.
void DisconnectFrom(mitk::IGTLMessageSource *UpstreamFilter)
Disconnects this filter from the outputs of the given IGTLMessageSource.
static CallbackFromGUIThread * GetInstance()
This class is a singleton.
OpenIGTLink message source.
virtual igtl::MessageBase::Pointer GetMessage() const
returns the OpenIGTLink message
virtual void SetInput(unsigned int idx, const IGTLMessage *msg)
Set input with id idx of this filter.
void OnIncomingCommand() override
This method is called when the IGTL device hold by this class receives a new command.
void OnIncomingMessage() override
This method is called when the IGTL device hold by this class receives a new message.
virtual void SetName(std::string _arg)
Sets the human readable name of this source. There is also a default name, but you can use this metho...
void SendMessage(mitk::IGTLMessage::Pointer msg)
Adds the given message to the sending queue.
void GenerateData() override
filter execute method
static const std::string US_INTERFACE_NAME
These Constants are used in conjunction with Microservices.
void ConnectTo(mitk::IGTLMessageSource *UpstreamFilter)
Connects the input of this filter to the outputs of the given IGTLMessageSource.
static ModuleContext * GetModuleContext()
Returns the module context of the calling module.
std::string RemoveRequestPrefixes(std::string requestType)