线程模型

总结

single(ids)

单工作线程完成所有工作。首个模块完成抓包,其他模块依次处理,没有后续队列。

workers(ids)

根据监听网卡数量和每个网卡可启用的并行抓包线程数量确定工作线程数量。每个工作线程与single模式单线程工作流程一样,互不影响。

autofp(ids)

两种数据包处理线程,分别是收包线程和检测线程。收包线程和检测线程间通过PacketQueue传递数据包进行处理,每个检测线程对应一个队列,多个检测线程时需要为数据包选择队列以确保同一个流的数据包按顺序传递给同一个检测线程。

主线程

  • 注册runmode,填充runmodes
    • main函数中调用RunModeRegisterRunModes函数完成所有runmode的注册,填充了所有runmode的结构体RunMode,这里参考suricata 4.0.3 runmode
  • 注册队列处理程序,填充tmqh_table
    • main函数中由PostConfLoadedSetup函数中调用TmqhSetup函数实现。填充了队列处理程序结构体Tmqh,填充队列处理程序name分别为
      • “simple”
      • “nfq”
      • “packetpool”
      • “flow”
  • 注册线程模块,填充tmm_modules
    • main函数中由PostConfLoadedSetup函数中调用RegisterAllModules函数实现。每种抓包模式有receive和decode两种功能模块,IPS模式的抓包模型另外有一个verdict功能模块,其他还有公用的各种模块,名字列举如下。
      • “UnixManager”
      • “FlowManager”
      • “FlowRecycler”
      • “FlowWorker”
      • “RespondReject”
      • “ReceivePcap”
      • “DecodePcap”
      • “ReceiveAFP”
      • “DecodeAFP”
      • ······
  • 运行所有线程模块的Init回调函数
    • main函数中由PostConfLoadedSetup函数中调用TmModuleRunInit函数实现,但是这里调试确认没有任何模块注册了这个回调函数。
  • 执行RunMode结构体成员回调函数RunModeFunc
    • main函数中由RunModeDispatch函数确定当前应该选取的RunMode项,而后调用。
    • 不同runmode下回调函数针对当前抓包模式和线程模型创建适当数量的ThreadVars并挂载相应的TmModole,而后对每个ThreadVars启动一个数据包处理线程(TVT_PPT),下面详细描述。
  • 创建flow mangaer、flow recycler、state wakeup、state management共四个管理线程(TVT_MGMT)。
    • RunModeDispatch函数末尾执行。
  • 创建unix manager命令接收线程(TVT_CMD)
    • main函数中在RunModeDispatch函数后执行。

工作线程

针对不同的线程模型,有不同的线程组织关系

single(ids)

单工作线程完成所有工作。首个模块完成抓包,其他模块依次处理,没有后续队列。

  • 线程数量:1
  • 线程模块
    • 该抓包模式的receive功能模块
    • 该抓包模式的decode功能模块
    • “FlowWorker”模块
    • “RespondReject”模块
  • 线程入口
    • “pktacqloop” TmThreadsSlotPktAcqLoop
  • inq
    • 无意义
  • tmqh_in
    • 无意义
  • outctx
    • 无意义
  • outq
    • 无意义
  • tmqh_out
    • packetpool队列处理程序的OutHandler,清理并释放。

single模式线程如图

workers(ids)

每个工作线程与single模式单线程工作流程一样,互不影响。

  • 线程数量:网卡接口数量 * 每个网卡接口可并发的抓包线程数。
  • 线程模块:内部与single模式一致。

workers模式线程如图

autofp(ids)

两种数据包处理线程,分别是收包线程和检测线程。收包线程和检测线程见通过PacketQueue传递数据包进行处理,每个检测线程对应一个队列,多个检测线程时需要为数据包选择队列以确保同一个流的数据包按顺序传递给同一个检测线程。

  • 收包线程
    • 线程数量:网卡接口数量 * 每个网卡接口可并发的抓包线程数。
    • 线程模块
      • 该抓包模式的receive功能模块
      • 该抓包模式的decode功能模块
    • 线程入口
      • “pktacqloop” TmThreadsSlotPktAcqLoop
    • inq
      • 无意义
    • tmqh_in
      • 无意义
    • outctx
      • 存储了worker线程数量相等的PacketQueue,用于将数据包交于对应的worker线程进行检测。
    • outq
      • 无意义
    • tmqh_out
      • flow队列处理程序的OutHandler,根据数据包流调度算法hash或ippaire将数据包放入相应队列。
  • 检测线程
    • 线程数量
      1. 若配置文件项threading.set-cpu-affinity为TRUE时,线程数量为threading.worker-cpu-set.threads。否则为0。
      2. 若线程数量为0,则线程数量修改为 cpu数量 * 配置文件项threading.detect-thread-ratio
      3. 若线程数量小于1,则取1
      4. 若线程数量大于1024,则取1024
    • 线程模块
      • “FlowWorker”模块
      • “RespondReject”模块
    • 线程入口
      • “varslot” TmThreadsSlotVar
    • inq
      • 一个实际的Tmq,存储了该检测线程应当读取的trans_q数据包队列数组中的项目序号。
    • tmqh_in
      • flow队列处理程序的InHandler,从inq标识的数据包队列中读取数据包。
    • outctx
      • 无意义
    • outq
      • 无意义
    • tmqh_out
      • packetpool队列处理程序的OutHandler,清理并释放。

autofp模式线程如图

线程入口函数

参数”pktacqloop”

TmThreadsSlotPktAcqLoop

  • 阻塞SIGUSR2信号。
  • 生效线程名。
  • 生效thread_setup_flasgs.
  • 在线程私有数据(TSD)中创建一个PktPool,并创建max_pending_packets(默认为1024,最大为65534,配置文件可配置)数量的packet,链接到该池子。用于存储捕获到的数据包,并减少内存反复分配。
  • 按顺序运行每个TmSlot中的SlotThreadInit函数指针(也就是TmModule中的ThreadInit函数指针),传递slot_initdata也就是网卡接口配置作为初始化参数,传递slot_data的地址用以设置该成员作为初始化的输出。
  • 按顺序初始化每个TmSlot中的slot_pre_pq和slot_post_pq成员,并初始化了这两个PacketQueue中的mutex锁。
  • 配置ThreadVars的stream_pq成员。TODO
  • 好像配置了统计相关的东西。TODO
  • 循环调用首个TmSlot的PktAcqLoop函数指针,三个参数分别是ThreadVars实例,刚刚初始化后的slot_data,TmSlot实例自身。
    • 线程内通过调用TmThreadsSlotProcessPkt对每一个数据包进行处理。
    • TmThreadsSlotProcessPkt内调用TmThreadsSlotVarRun将数据包从传入的TmSlot(这里应该已经是抓包模块之后的模块了,也就是解码模块)开始处理。
      • TmThreadsSlotVarRun内调用TmSlot的SlotFunc函数指针处理数据包。
      • SlotFunc处理完成后,检查TmSlot的slot_pre_pq,若不为空则调递归调用TmThreadsSlotVarRun从下一个TmSlot开始处理slot_pre_pq中的数据包,处理完成后调用tmqh_out释放数据包。
      • 循环处理直到清空slot_pre_pq。
      • 重复上述过程,直到所有TmSlot都经过上述处理。
    • TmThreadsSlotVarRun返回后调用tmqh_out队列处理程序将数据包释放。
    • 数据包释放后检查TmSlot的slot_post_pq项,若队列中有数据包,则调用TmThreadsSlotVarRun将数据包从下一个TmSlot开始处理,处理完成后交由tmqh_out释放,循环处理直到队列清空。
    • 顺序处理每个TmSlot的slot_post_pq项。
  • 每次调用PktAcqLoop后检查是否应该退出,若应退出则跳出循环。
  • 跳出循环后又是统计相关。TODO
  • 看起来是处理流过期的。TODO
  • 销毁PktPool内的Packet及其他相关内存。
  • 按顺序调用每个TmSlot的SlotThreadExitPrintStats和SlotThreadDeinit函数指针。
  • ThreadVars实例的stream_pq置NULL。
  • 线程退出。

参数”varslot”

TmThreadsSlotVar

  • 阻塞SIGUSR2信号。
  • 在线程私有数据(TSD)中创建一个PktPool,与”pktacqloop”不同,这里并不预分配Packet。
  • 生效线程名。
  • 生效thread_setup_flasgs.
  • 按顺序运行每个TmSlot中的SlotThreadInit函数指针(也就是TmModule中的ThreadInit函数指针),这里不需要slot_initdata。
  • 按顺序初始化每个TmSlot中的slot_pre_pq和slot_post_pq成员,并初始化了这两个PacketQueue中的mutex锁。
  • 配置ThreadVars的stream_pq成员。TODO
  • 好像配置了统计相关的东西。TODO
  • 循环调用ThreadVars的tmqh_in获取数据包,对数据包调用TmThreadsSlotVarRun(内部逻辑见”pktacqloop”),后续对数据包和slot_pre_pq以及slot_post_pq队列的处理逻辑与”pktacqloop”处理逻辑一致。
  • 后续逻辑与”pktacqloop”一致。

参数”command”

TmThreadsManagement
TODO

参数”custom”

设置为参数中配置的函数指针

结构体

ThreadVars

线程所有需要的运行参数保存与此,可以看作一个线程实例

threadvars.h中ThreadVars结构定义,对应了一个线程实例。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/** \brief Per thread variable structure */
typedef struct ThreadVars_ {
pthread_t t;
char name[16];
char *printable_name;
char *thread_group_name;

SC_ATOMIC_DECLARE(unsigned int, flags);

/** TmModule::flags for each module part of this thread */
uint8_t tmm_flags;

/** local id */
int id;

/** queue's */
Tmq *inq;
Tmq *outq;
void *outctx;
const char *outqh_name;

/** queue handlers */
struct Packet_ * (*tmqh_in)(struct ThreadVars_ *);
void (*InShutdownHandler)(struct ThreadVars_ *);
void (*tmqh_out)(struct ThreadVars_ *, struct Packet_ *);

/** slot functions */
void *(*tm_func)(void *);
struct TmSlot_ *tm_slots;

/** stream packet queue for flow time out injection */
struct PacketQueue_ *stream_pq;

uint8_t thread_setup_flags;

/** the type of thread as defined in tm-threads.h (TVT_PPT, TVT_MGMT) */
uint8_t type;

uint16_t cpu_affinity; /** cpu or core number to set affinity to */
uint16_t rank;
int thread_priority; /** priority (real time) for this thread. Look at threads.h */

/* counters */

/** public counter store: counter syncs update this */
StatsPublicThreadContext perf_public_ctx;

/** private counter store: counter updates modify this */
StatsPrivateThreadContext perf_private_ctx;

SCCtrlMutex *ctrl_mutex;
SCCtrlCondT *ctrl_cond;

uint8_t cap_flags; /**< Flags to indicate the capabilities of all the
TmModules resgitered under this thread */
struct ThreadVars_ *next;
struct ThreadVars_ *prev;
} ThreadVars;

ThreadVars中各字段含义如下:

  • t
    • 线程句柄
  • name
    • 线程短名字,会设置到线程名中。
  • printable_name
    • 线程完整名字,基本没用,目前看counter中好像有使用,不确定。
  • thread_group_name
    • 完全没有用。
  • flags
    • 标记线程当前状态,通过宏扩展成flags_sc_atomic。
  • tmm_flags
    • 线程下所有挂载的TmModule的flags成员的或集,用于标记线程所具备的功能。
  • id
    • 注册到thread_store中的序号,1开始。
  • inq
    • 输入数据包的队列Tmq,成员中的id标记了当前线程的输入数据包队列应该使用trans_q数组中的第几项,与tmqh_in配合。数据包捕获线程未使用该项。autofp(ids)模式下worker(检测)线程使用该项。
  • tmqh_in
    • 输入数据包的队列处理程序中的InHandler,用于线程获取需要处理的数据包。autofp(ids)模式下worker(检测)线程使用该项,
  • InShutdownHandler
    • tmqh_in所属的队列处理程序中的InShutdownHandler。
  • outq
    • 好像没有地方使用了该项。
  • outctx
    • autofp线程模型下,数据包捕获线程需要将数据包转交给一组worker(检测)线程,每个捕获线程需要保存所有worker线程的PacketQueue,该项用于存储所有这些PacketQueue。
  • outqh_name
    • 输出数据包的队列处理程序的名字。
  • tmqh_out
    • 输出数据包的队列处理程序中的OutHandler,用于线程将处理完成的数据包通过PacketQueue转交给下一个线程或释放该数据包。
  • tm_func
    • 线程的入口函数,这里根据创建线程实例时传递的参数,可有几种选项。
      • 参数”varslot”,TmThreadsSlotVar
      • 参数”pktacqloop”,TmThreadsSlotPktAcqLoop
      • 参数”command”,TmThreadsManagement
      • 参数”custom”,设置为参数中配置的函数指针
  • tm_slots
    • 线程下所有挂载的TmSlot结构链表,其实也就是所有关联的TmModule。
  • stream_pq
  • thread_setup_flags
    • 枚举型,标记当前线程在启动后是否需要配置优先级和cpu亲和度。共三种可选
      • THREAD_SET_AFFINITY
      • THREAD_SET_PRIORITY
      • THREAD_SET_AFFTYPE
  • type
    • 标记线程类型,根据该类型将线程连接到全局变量数组tv_root下,同类型线程用prev和next成员连接成链表。共三种
      • TVT_PPT 数据包处理线程
      • TVT_MGMT 管理线程
      • TVT_CMD 指令接收线程
  • cpu_affinity
    • 当thread_setup_flags为THREAD_SET_AFFINITY时,这里保存的是线程所需要配置的cpu亲和度。
    • 当thread_setup_flags为THREAD_SET_AFFTYPE时,这里是枚举型,标记当前线程配置cpu亲和度所处的cpu集合类型。共四种可选。
      • RECEIVE_CPU_SET
      • WORKER_CPU_SET
      • VERDICT_CPU_SET
      • MANAGEMENT_CPU_SET
  • rank
  • thread_priority
  • perf_public_ctx
  • perf_private_ctx
  • ctrl_mutex
  • ctrl_cond
  • cap_flags
    • 线程下所有挂载的TmModule的cap_flags成员的或集,用于标记线程工作所需要的所有能力(权限)。
  • next
    • 在全局变量数组tv_root下,同类型线程连接成链表
  • prev
    • 在全局变量数组tv_root下,同类型线程连接成链表
tm-threads.c中定义有thread_store,注册了所有生成的ThreadVars实例,看起来可以重复使用ThreadVars。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
typedef struct Thread_ {
ThreadVars *tv; /**< threadvars structure */
const char *name;
int type;
int in_use; /**< bool to indicate this is in use */

struct timeval ts; /**< current time of this thread (offline mode) */
} Thread;

typedef struct Threads_ {
Thread *threads;
size_t threads_size;
int threads_cnt;
} Threads;

static Threads thread_store = { NULL, 0, 0 };

ThreadVars关联的部分结构

TmSlot

一个TmSlot结构可以看作一个TmModule结构挂载到ThreadVars时的运行时形态。

tm-threads.h 中定义有TmSlot结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
typedef TmEcode (*TmSlotFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);

typedef struct TmSlot_ {
/* the TV holding this slot */
ThreadVars *tv;

/* function pointers */
SC_ATOMIC_DECLARE(TmSlotFunc, SlotFunc);

TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);

TmEcode (*SlotThreadInit)(ThreadVars *, const void *, void **);
void (*SlotThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*SlotThreadDeinit)(ThreadVars *, void *);

/* data storage */
const void *slot_initdata;
SC_ATOMIC_DECLARE(void *, slot_data);

/* queue filled by the SlotFunc with packets that will
* be processed futher _before_ the current packet.
* The locks in the queue are NOT used */
PacketQueue slot_pre_pq;

/* queue filled by the SlotFunc with packets that will
* be processed futher _after_ the current packet. The
* locks in the queue are NOT used */
PacketQueue slot_post_pq;

/* store the thread module id */
int tm_id;

/* slot id, only used my TmVarSlot to know what the first slot is */
int id;

/* linked list, only used when you have multiple slots(used by TmVarSlot) */
struct TmSlot_ *slot_next;

/* just called once, so not perf critical */
TmEcode (*Management)(ThreadVars *, void *);

} TmSlot;

TmSlot中各成员含义如下:

  • tv
    • 归属的ThreadVar
  • SlotFunc
    • TmModule中的Func函数,用于数据包处理。为什么用原子类型声明,我觉得可能是因为有病吧。
  • PktAcqLoop
    • TmModule中的同名PktAcqLoop函数,用于数据包捕获。
  • SlotThreadInit
    • TmModule中的ThreadInit函数。
  • SlotThreadExitPrintStats
    • TmModule中的ThreadExitPrintStats函数。
  • SlotThreadDeinit
    • TmModule中的ThreadDeinit函数。
  • slot_initdata
    • 配置文件中该抓包模式下特定网卡配置参数的内存存储。每种抓包模式都有自己所需要的配置参数,并且对每个网卡接口都可以不同,每个线程只能对一个网卡抓包,这里记录了该网卡接口的配置参数。此项只对收包模块有意义。
  • slot_data
    • 线程入口函数中调用SlotThreadInit,将初始化生成的后续需要的数据存储在slot_data该项。
  • slot_pre_pq
    • 这是一个PacketQueue,当SlotFunc被调用以处理数据包时,可能会生成新的需要处理的伪造数据包,插入slot_pre_pq队列的伪造数据包将先于原有数据包从下一个TmSlot开始处理。当原数据包经过所有TmSlot处理且所有TmSlot的slot_pre_pq都清空后,原数据包由tmqh_out交给下一个线程或释放。目前看到IP数据包分片的处理需要这个队列。
  • slot_post_pq
    • 这是一个PacketQueue,当SlotFunc被调用以处理数据包时,可能会生成新的需要处理的伪造数据包,插入slot_post_pq队列的伪造数据包将晚于原有数据包进入下一个TmSlot开始处理。当原数据包由tmqh_out交给下一个线程或释放后,才能开始slot_post_pq的处理。目前未看到需要这个队列的场景。
  • tm_id
    • TmModule在tmm_modules中注册的id。
  • id
    • 挂载到ThreadVars上的顺序号,从0开始。
  • slot_next
    • 用于链接后续TmSlot,构成一个链表。
  • Management
    • TmModule中的同名Management函数,暂时不确定作用。

TmModule

线程模块

tm-modules.h中线程模块TmModule结构定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
typedef TmEcode (*ThreadInitFunc)(ThreadVars *, const void *, void **);
typedef TmEcode (*ThreadDeinitFunc)(ThreadVars *, void *);
typedef void (*ThreadExitPrintStatsFunc)(ThreadVars *, void *);

typedef struct TmModule_ {
const char *name;

/** thread handling */
TmEcode (*ThreadInit)(ThreadVars *, const void *, void **);
void (*ThreadExitPrintStats)(ThreadVars *, void *);
TmEcode (*ThreadDeinit)(ThreadVars *, void *);

/** the packet processing function */
TmEcode (*Func)(ThreadVars *, Packet *, void *, PacketQueue *, PacketQueue *);

TmEcode (*PktAcqLoop)(ThreadVars *, void *, void *);

/** terminates the capture loop in PktAcqLoop */
TmEcode (*PktAcqBreakLoop)(ThreadVars *, void *);

TmEcode (*Management)(ThreadVars *, void *);

/** global Init/DeInit */
TmEcode (*Init)(void);
TmEcode (*DeInit)(void);

void (*RegisterTests)(void);

uint8_t cap_flags; /**< Flags to indicate the capability requierment of
the given TmModule */
/* Other flags used by the module */
uint8_t flags;
} TmModule;

TmModule tmm_modules[TMM_SIZE];
tm-threads-common.h中定义有所有的线程模块id。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/** \brief Thread Model Module id's.
*
* \note anything added here should also be added to TmModuleTmmIdToString
* in tm-modules.c
*/
typedef enum {
TMM_FLOWWORKER,
TMM_DECODENFQ,
TMM_VERDICTNFQ,
TMM_RECEIVENFQ,
TMM_RECEIVEPCAP,
TMM_RECEIVEPCAPFILE,
TMM_DECODEPCAP,
TMM_DECODEPCAPFILE,
TMM_RECEIVEPFRING,
TMM_DECODEPFRING,
TMM_RESPONDREJECT,
TMM_DECODEIPFW,
TMM_VERDICTIPFW,
TMM_RECEIVEIPFW,
TMM_RECEIVEERFFILE,
TMM_DECODEERFFILE,
TMM_RECEIVEERFDAG,
TMM_DECODEERFDAG,
TMM_RECEIVEAFP,
TMM_DECODEAFP,
TMM_RECEIVENETMAP,
TMM_DECODENETMAP,
TMM_ALERTPCAPINFO,
TMM_RECEIVEMPIPE,
TMM_DECODEMPIPE,
TMM_RECEIVENAPATECH,
TMM_DECODENAPATECH,
TMM_STATSLOGGER,
TMM_RECEIVENFLOG,
TMM_DECODENFLOG,

TMM_FLOWMANAGER,
TMM_FLOWRECYCLER,
TMM_DETECTLOADER,

TMM_UNIXMANAGER,

TMM_SIZE,
} TmmId;

Tmq

线程队列,与数据包队列不同,Tmq用于定位线程的输入或输出使用哪个或哪些具体的PacketQueue。

tm-queues.h中Tmq结构定义
1
2
3
4
5
6
7
8
typedef struct Tmq_ {
char *name;
uint16_t id;
uint16_t reader_cnt;
uint16_t writer_cnt;
/* 0 for packet-queue and 1 for data-queue */
uint8_t q_type;
} Tmq;

Tmq中各成员含义如下:

  • name
    • 创建队列时使用的名字,队列的上下游使用名字以确定同一个队列。
  • id
    • 顺序增长的id,标识了这个Tmq所对应的PacketQueue位于全局trans_q数组的哪一项上。trans_q是一个PacketQueue数组。
  • reader_cnt
    • 标识了该队列有几个读者。
  • writer_cnt
    • 标识了该队列有几个写者。
  • q_type
    • 标识了队列是数据包队列还是数据队列,貌似是cuda使用的,不确定。
tm-queues.c中同样有一个存储了所有Tmq的数组,除了预先分配了Tmq的空间,并没有什么用。
1
2
3
4
#define TMQ_MAX_QUEUES 256

static uint16_t tmq_id = 0;
static Tmq tmqs[TMQ_MAX_QUEUES];

Tmqh

队列处理程序,线程对数据包的输入和输出,由队列处理程序负责,比如ThreadVars中的tmqh_in负责从PacketQueue中取得上游线程输出的Packet,tmqh_out负责将本线程输出的数据包交付给下一阶段。
输入的特殊情况是,抓包线程的输入数据包由相应的receive模块获得,因此tmqh_in无意义。
输出的特殊情况是,最后一个工作线程由于没有下一阶段,因此tmqh_out的实际功能是将数据包返回池子或释放。

tm-queuehandlers.h中Tmqh结构定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
enum {
TMQH_SIMPLE,
TMQH_NFQ,
TMQH_PACKETPOOL,
TMQH_FLOW,

TMQH_SIZE,
};

typedef struct Tmqh_ {
const char *name;
Packet *(*InHandler)(ThreadVars *);
void (*InShutdownHandler)(ThreadVars *);
void (*OutHandler)(ThreadVars *, Packet *);
void *(*OutHandlerCtxSetup)(const char *);
void (*OutHandlerCtxFree)(void *);
void (*RegisterTests)(void);
} Tmqh;

Tmqh tmqh_table[TMQH_SIZE];

“packetpool”

tmqh-packetpool.c中"packetpool"队列处理程序注册
1
2
3
4
5
6
void TmqhPacketpoolRegister (void)
{
tmqh_table[TMQH_PACKETPOOL].name = "packetpool";
tmqh_table[TMQH_PACKETPOOL].InHandler = TmqhInputPacketpool;
tmqh_table[TMQH_PACKETPOOL].OutHandler = TmqhOutputPacketpool;
}
  • InHandler
    TmqhInputPacketpool,从当前线程的线程私有数据区域的PktPool中取Packet。
  • OutHandler
    TmqhOutputPacketpool,如果Packet是隧道类型数据包,针对隧道类型做一些处理。之后将Packet释放,是释放到其所属线程的PktPool中还是释放内存取决于其ReleasePacket函数指针。

“flow”

tmqh-flow.c中"flow"队列处理程序注册
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void TmqhFlowRegister(void)
{
tmqh_table[TMQH_FLOW].name = "flow";
tmqh_table[TMQH_FLOW].InHandler = TmqhInputFlow;
tmqh_table[TMQH_FLOW].OutHandlerCtxSetup = TmqhOutputFlowSetupCtx;
tmqh_table[TMQH_FLOW].OutHandlerCtxFree = TmqhOutputFlowFreeCtx;
tmqh_table[TMQH_FLOW].RegisterTests = TmqhFlowRegisterTests;

const char *scheduler = NULL;
if (ConfGet("autofp-scheduler", &scheduler) == 1) {
if (strcasecmp(scheduler, "round-robin") == 0) {
SCLogNotice("using flow hash instead of round robin");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "active-packets") == 0) {
SCLogNotice("using flow hash instead of active packets");
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "hash") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
} else if (strcasecmp(scheduler, "ippair") == 0) {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowIPPair;
} else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry \"%s\" "
"for autofp-scheduler in conf. Killing engine.",
scheduler);
exit(EXIT_FAILURE);
}
} else {
tmqh_table[TMQH_FLOW].OutHandler = TmqhOutputFlowHash;
}

return;
}
  • InHandler
    TmqhInputFlow,确定当前读取的PacketQueue为&trans_q[tv->inq->id],取出Packet。
  • OutHandlerCtxSetup
    TmqhOutputFlowSetupCtx,通过传入的逗号分隔的一组队列名,创建相应的一组队列,存储到结构体TmqhFlowCtx中并将结构体指针返回后存储在ThreadVars成员outctx中。
  • OutHandlerCtxFree
    TmqhOutputFlowFreeCtx,释放TmqhOutputFlowSetupCtx分配出的内存。主线程中被调用。
  • OutHandler
    通过配置文件的autofp-scheduler项选择使用的数据包分配调度策略,”hash”和”ippaire”是有效选项,旧的不在有效的选项和未配置该项都会生效为”hash”。
    • TmqhOutputFlowHash,以Packet的flow_hash项选择ThreadVars成员outctx中的一个PacketQueue,将数据包入队列交付给相应的worker线程处理。
    • TmqhOutputFlowIPPair,以数据包的源和目的地址两项选择ThreadVars成员outctx中的一个PacketQeueue,将数据包入队列交付给相应的worker线程处理。

“simple”

tmqh-simple.c中"simple"队列处理程序注册
1
2
3
4
5
6
7
void TmqhSimpleRegister (void)
{
tmqh_table[TMQH_SIMPLE].name = "simple";
tmqh_table[TMQH_SIMPLE].InHandler = TmqhInputSimple;
tmqh_table[TMQH_SIMPLE].InShutdownHandler = TmqhInputSimpleShutdownHandler;
tmqh_table[TMQH_SIMPLE].OutHandler = TmqhOutputSimple;
}

“nfq”

tmqh-nfq.c中"nfq"队列处理程序注册
1
2
3
4
5
6
void TmqhNfqRegister (void)
{
tmqh_table[TMQH_NFQ].name = "nfq";
tmqh_table[TMQH_NFQ].InHandler = NULL;
tmqh_table[TMQH_NFQ].OutHandler = TmqhOutputVerdictNfq;
}

Packet

数据包,suricata内通用的数据包结构,不同抓包模式获取的数据包都封装为此结构,其中保存了实际捕获的数据包数据和数据长度,prev和next结构使其可构成链表以存入PacketQueue,pool指出了所属的池子,ReleasePacket函数指针用以确定数据包释放的具体方式(比如返回池子或直接释放内存)。

decode.h 中Packet的定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
typedef struct Packet_
{
/*
* .........
* .........
* something
* .........
* .........
*/

/* double linked list ptrs */
struct Packet_ *next;
struct Packet_ *prev;

/* The Packet pool from which this packet was allocated. Used when returning
* the packet to its owner's stack. If NULL, then allocated with malloc.
*/
struct PktPool_ *pool;

} Packet;

PacketQueue

数据包队列,top和bot指示了一个Packet链表的首尾,用于上下游不同数据包处理线程间的数据包传递。由于是多线程并发访问,需要锁和信号量。

1
2
3
4
5
6
7
8
9
10
typedef struct PacketQueue_ {
Packet *top;
Packet *bot;
uint32_t len;
#ifdef DBG_PERF
uint32_t dbg_maxlen;
#endif /* DBG_PERF */
SCMutex mutex_q;
SCCondT cond_q;
} PacketQueue;

PktPool

数据包池,Packet的池子,可以预先分配一定数量的Packet,以降低后期创建Packet的内存操作代价。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* Return stack, onto which other threads free packets. */
typedef struct PktPoolLockedStack_{
/* linked list of free packets. */
SCMutex mutex;
SCCondT cond;
SC_ATOMIC_DECLARE(int, sync_now);
Packet *head;
} __attribute__((aligned(CLS))) PktPoolLockedStack;

typedef struct PktPool_ {
/* link listed of free packets local to this thread.
* No mutex is needed.
*/
Packet *head;
/* Packets waiting (pending) to be returned to the given Packet
* Pool. Accumulate packets for the same pool until a theshold is
* reached, then return them all at once. Keep the head and tail
* to fast insertion of the entire list onto a return stack.
*/
struct PktPool_ *pending_pool;
Packet *pending_head;
Packet *pending_tail;
uint32_t pending_count;

#ifdef DEBUG_VALIDATION
int initialized;
int destroyed;
#endif /* DEBUG_VALIDATION */

/* All members above this point are accessed locally by only one thread, so
* these should live on their own cache line.
*/

/* Return stack, where other threads put packets that they free that belong
* to this thread.
*/
PktPoolLockedStack return_stack;
} PktPool;

PktPool中各成员含义如下:

  • head
    • 挂载当前池子可用的数据包结构,线程操作私有池子时不需要加锁。
  • pending_pool
    • 需要归还的数据包属于其他池子时,可挂载到当前线程的pending_head上,并用pending_pool标记出pending_head上数据包所属的其他线程的池子,只能pending一个其他线程的数据包。当pending数量达到配置上限或那个池子标记了需要同步时将pending的数据包整体归还。
  • pending_head
  • pending_tail
  • pending_count
  • return_stack
    • 其他线程需要将packet归还到本线程时,会将packet链接到return_stack上。本线程需要取得未使用的Packet且head上已经为空时,会尝试将return_stack上的链表切换到head上。由于有多线程操作,return_stack内包含锁和信号量。