以下是使用日志文件提取的一些技巧。我们正在研究一些 Enterprise Splunk 提取物。我们可以摆弄 Splunk,尝试探索数据。或者我们可以获取简单的提取物并在 Python 中摆弄数据。
在 Python 中运行不同的实验似乎比尝试在 Splunk 中进行这种探索性的摆弄更有效。主要是因为我们可以对数据做什么没有任何限制。我们可以在一个地方创建非常复杂的统计模型。
理论上,我们可以在Splunk中做很多探索。它具有多种报告和分析功能。
但...
使用 Splunk 假定我们知道我们在寻找什么。在许多情况下,我们并不知道我们在寻找什么:我们在探索。我们可能有一些迹象表明一些 RESTful API 事务很慢,但仅此而已。我们如何进行?
第一步是获取 CSV 格式的原始数据。怎么办?
读取原始数据
我们将从使用一些附加功能包装 CSV.DictReader 对象开始。
面向对象的纯粹主义者会反对这种策略。 “为什么不直接扩展 DictReader?”他们问。我没有很好的答案。我倾向于函数式编程和组件的正交性。使用纯 OO 方法,我们必须使用看起来更复杂的混入来实现这一点。
我们处理日志的大体框架是这样的。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
这使我们能够读取 CSV 格式的 Splunk 提取物。我们可以遍历阅读器中的行。这是技巧#1。这不是 真的 很棘手,但我喜欢它。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们可以——在一定程度上——以有用的格式报告原始数据。如果我们想修饰输出,我们可以更改格式字符串。也许是“{host:30s} {ReponseTime:8s} {source:s}”或类似的东西。
过滤
一个常见的情况是我们提取的太多了,只需要看到一个子集。我们可以更改 Splunk 过滤器,但是,我们讨厌在完成探索之前过度使用。在 Python 中过滤要容易得多。一旦我们了解了我们需要什么,我们就可以在 Splunk 中完成。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们已经注入了一个生成器表达式,它将过滤源行,使我们能够处理有意义的子集。
投影
在某些情况下,我们会有额外的源数据列,我们真的不想使用这些列。我们将通过对每一行进行投影来消除这些数据。
原则上,Splunk 从不生成空列。但是,RESTful API 日志可能会导致数据集包含大量基于代理键的唯一列标题,这些代理键是请求 URI 的一部分。这些列将包含来自使用该代理键的一个请求的一行数据。对于每隔一行,该列中没有任何用处。如果我们从每一行中删除空列,生活就会简单得多。
我们也可以使用生成器表达式来做到这一点,但是它有点长。生成器函数更易于阅读。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们从原始阅读器中的项目子集构建了一个新的行字典。我们可以用它来包装过滤器的输出。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
这将减少 for 语句内部可见的未使用列。
符号变化
row['source'] 符号会变得笨拙。使用 types.SimpleNamespace 比使用字典要好得多。这允许我们使用 row.source。
这是创建更有用的东西的绝妙技巧。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们可以将其折叠到我们的步骤序列中。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
请注意我们的 format_map() 方法的小改动。我们添加了 vars() 函数以从 SimpleNamespace 的属性中提取字典。
我们可以把它写成一个函数来保持与其他函数的句法对称性。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
事实上,我们可以把它写成一个像函数一样使用的 lambda 结构。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
虽然 ns_reader() 函数和 ns_reader() lambda 的使用方式相同,但为 lambda 编写文档字符串和 doctest 单元测试稍微困难一些。因此,应该避免使用 lambda。
我们可以使用 map(lambda row: types.SimpleNamespace(**row), reader)。有些人更喜欢这个而不是生成器表达式。
我们可以在内部 yield 语句中使用适当的 for 语句,但是从小事做大语句似乎没有任何好处。
我们有很多选择,因为 Python 提供了如此多的函数式编程特性。我们并不经常看到 Python 被吹捧为一种函数式语言。然而,我们有多种方法来处理简单的映射。
映射:转换和派生数据
我们通常会有一个非常明显的数据转换列表。此外,我们将拥有越来越多的派生数据项列表。衍生项目将是动态的,并且基于我们正在测试的不同假设。每次我们进行实验或提出问题时,我们都可能会更改派生数据。
这些步骤中的每一个:过滤、投影、转换和推导,都是 map-reduce 管道的“映射”部分中的阶段。我们可以创建一些较小的函数并使用 map() 应用它们。因为我们正在更新有状态对象,所以我们不能使用通用的 map() 函数。如果我们想要实现更纯粹的函数式编程风格,我们会使用不可变的 namedtuple 而不是可变的 SimpleNamespace。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
在探索过程中,我们将调整此转换函数的主体。也许我们将从一些最小的转换和推导集开始。我们将用一些“这些是对的吗?”来扩展它。诸如此类的事情。当我们发现不起作用时,我们会取出一些。
我们的整体处理过程是这样的:
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
请注意我们的 for 语句正文中的更改。我们的 convert() 函数产生我们确定的值。我们在 for 循环中添加了一些我们不是 100% 确定的额外变量。在更新 convert() 函数之前,我们将看看它们是否有帮助(甚至正确)。
减少
在减少方面,我们可以采用稍微不同的处理方式。我们需要重构我们之前的例子,把它变成一个生成器函数。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们用 yield 替换了 print() 。
这是重构的另一部分。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
理想情况下,我们所有的编程看起来都像这样。我们使用生成器函数来生成数据。数据的最终显示完全独立。这使我们能够更自由地重构和更改处理。
现在我们可以做一些事情,比如将行收集到 Counter() 对象中,或者计算一些统计数据。我们可能会使用 defaultdict(list) 按服务对行进行分组。
with open("somefile.csv") as source:
rdr = csv.DictReader(source)
我们决定在这里创建具体的列表对象。我们可以使用 itertools 按服务对响应时间进行分组。它看起来像是正确的函数式编程,但实现指出了函数式编程的 Pythonic 形式的一些局限性。我们要么必须对数据进行排序(创建列表对象),要么必须在对数据分组时创建列表。为了进行多种不同的统计,通常更容易通过创建具体列表来对数据进行分组。
我们现在不是简单地打印一个行对象,而是在做两件事。
- 创建一些局部变量,如 svc 和 m。我们可以很容易地添加方差或其他措施。
- 使用不带参数的 vars() 函数,它会根据局部变量创建一个字典。
这种不带参数的 vars() 的使用——其行为类似于 locals()——是一个方便的技巧。它允许我们简单地创建我们想要的任何局部变量并将它们包含在格式化输出中。我们可以入侵我们认为可能相关的多种不同类型的统计措施。
现在我们的基本处理循环是 for row in converted_log("somefile.csv"),我们可以在一个微小的、易于修改的脚本中探索许多处理备选方案。我们可以探索许多假设来确定为什么一些 RESTful API 事务很慢而其他事务很快。