Expand description

§Asynchronous event support

Plugins with async events capability can enrich an event stream from a given source (not necessarily implemented by itself) by injecting events asynchronously in the stream. Such a feature can be used for implementing notification systems or recording state transitions in the event-driven model of the Falcosecurity libraries, so that they can be available to other components at runtime or when the event stream is replayed through a capture file.

For example, the Falcosecurity libraries leverage this feature internally to implement metadata enrichment systems such as the one related to container runtimes. In that case, the libraries implement asynchronous jobs responsible for retrieving such information externally outside the main event processing loop so that it’s non-blocking. The worker jobs produce a notification event every time a new container is detected and inject it asynchronously in the system event stream to be later processed for state updates and for evaluating Falco rules.

For your plugin to support asynchronous events, you will need to implement the async_event::AsyncEventPlugin trait and invoke the async_event macro, for example:

use std::ffi::{CStr, CString};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread::JoinHandle;
use anyhow::Error;
use falco_event::events::Event;
use falco_event::events::EventMetadata;
use falco_plugin::base::{InitInput, Plugin};
use falco_plugin::{async_event_plugin, EventInputExt, FailureReason, plugin};
use falco_plugin::async_event::{AsyncEvent, AsyncEventPlugin, AsyncHandler};

struct MyAsyncPlugin {
    stop_request: Arc<AtomicBool>,
    thread: Option<JoinHandle<Result<(), Error>>>,
}

impl Plugin for MyAsyncPlugin {
    // ...
}

impl AsyncEventPlugin for MyAsyncPlugin {
    const ASYNC_EVENTS: &'static [&'static str] = &[]; // generate any async events
    const EVENT_SOURCES: &'static [&'static str] = &[]; // attach to all event sources

    fn start_async(&mut self, handler: AsyncHandler) -> Result<(), Error> {
        // stop the thread if it was already running
        if self.thread.is_some() {
           self.stop_async()?;
        }

        // start a new thread
        self.stop_request.store(false, Ordering::Relaxed);
        let stop_request = Arc::clone(&self.stop_request);
        self.thread = Some(std::thread::spawn(move || {
            // check the stop flag periodically: we must stop the thread
            // when requested
            while !stop_request.load(Ordering::Relaxed) {
                // build an event
                let event = AsyncEvent {
                    plugin_id: Some(0),
                    name: Some(c"sample_async"),
                    data: Some(b"hello"),
                };

                let metadata = EventMetadata::default();

                let event = Event {
                    metadata,
                    params: event,
                };

                // submit it to the main event loop
                handler.emit(event)?;
            }
            Ok(())
        }));
        Ok(())
    }

    fn stop_async(&mut self) -> Result<(), Error> {
        self.stop_request.store(true, Ordering::Relaxed);
        let Some(handle) = self.thread.take() else {
            return Ok(());
        };

        match handle.join() {
            Ok(res) => res,
            Err(e) => std::panic::resume_unwind(e),
        }
    }
}

plugin!(MyAsyncPlugin);
async_event_plugin!(MyAsyncPlugin);

Structs§

Traits§